聊聊go-metrics中Meter的设计实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 我们的打算对某个golang的服务,统计一下它每秒的QPS, go-metrics成为我们不二的选择。 在我的文章METRICS的简易实现 我简单的给出了对这个问题的一种简易实现。 1. 简易实现的优缺点 优点 实现方式相对直观 缺点 为了一个指标,所带来的开销很大 每个指标需要1个专门的协程,每秒钟做一次"快照" 1分的的QPS需要60个点, 如果还需要5分钟, 15分钟的QPS 那么共需记录 60 * (1 + 5 + 15) = 1260 个点 2. go-metrics的实现 传送门 meter.go 2.1 理论 go-metrics中对于Meter的实现基于EWMA(Exponentially Weighted Moving-Average) 中文译为指数加权移动平均法 它是一种特殊的加权移动平均法。其特点是: 第一,指数平滑法进一步加强了观察期近期观察值对预测值的作用,对不同时间的观察值所赋予的权数不等,从而加大了近期观察值的权数,使预测值能够迅速反映市场实际的变化。权数之间按等比级数减少,此级数之首项为平滑常数a,公比为(1- a)。第二,指数平滑法对于观察值所赋予的权数有伸缩性,可以取不同的a 值以改变权数的变化速率。如a取小值,则权数变化较迅速,观察值的新近变化趋势较能迅速反映于指数移动平均值中。因此,运用指数平滑法,可以选择不同的a 值来调节时间序列观察值的均匀程度(即趋势变化的平稳程度)。 EWMA 在实际应用中,主要是用于预测股价变化等等 注意 下面公式中的λ和上面文献中的a 是同一个参数,特此说明 预测的方法是,每隔一段时间进行一次采样,每次采样完成之后,就对预测值进行一次修正,这种方法的特点是近期的采样值对预测值的影响大,远期的影响较小 这种理论是有合理性的,尤其是对于了连续变化的曲线 2.2 实现 meter.go 中,重要的结构有2个 type StandardMeter struct { lock sync.RWMutex snapshot *MeterSnapshot a1, a5, a15 EWMA startTime time.Time stopped bool } // 定时调用StandardMeter的tick方法 ...

March 13, 2018 · 1 min

Metrics的简易实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 我们打算统计服务的QPS,原型已经选了 go-metrics PS: 1800多个star看来不会错 但是我还是打算自己实现一版,看看我自己的实现和它的实现有什么不同 实现 package main import ( "fmt" "time" ) type MeterRate1 struct { count int64 // start和end 是一段移动的时间区间 // start和end 都是自新纪元起经历的秒数 // end - start = 60 seconds start int64 end int64 // 存储某一时刻,对应的count值 timestampCountMap map[int64]int64 } func NewMeterRate1() *MeterRate1 { m := MeterRate1{} m.count = 0 t := time.Now().Unix() m.start = t - 60 m.end = t m.timestampCountMap = make(map[int64]int64, 60) // 启动一个协程用于每秒"快照" go MeterTick(&m) return &m } func (m *MeterRate1) Mark(n int64) { m.count += n } func (m *MeterRate1) Rate() float64 { return float64(m.timestampCountMap[m.end]-m.timestampCountMap[m.start]) / 60.0 } func MeterTick(m *MeterRate1) { // 每秒触发一次, 快照这一时刻的count值, 存入timestampCountMap c := time.Tick(time.Second * 1) for x := range c { fmt.Println("tick", x) t := time.Now().Unix() m.timestampCountMap[t] = m.count m.start = t - 60 m.end = t delete(m.timestampCountMap, m.start-1) fmt.Println("---------------") for i := m.start; i < m.end+1; i++ { fmt.Printf("time:%v, count:%v\n", i, m.timestampCountMap[i]) } } } func main() { m := NewMeterRate1() go MyPrint2(m) var j int64 = 1 for true { time.Sleep(time.Second * 1) j++ m.Mark(j) } } func MyPrint2(m *MeterRate1) { for true { time.Sleep(time.Second) fmt.Println("rate1", m.Rate()) } } 在下一篇文章中,我会来介绍一下go-metrics中关于Meter的实现 ...

March 13, 2018 · 1 min

golang基于观察者模式管理多种worker的启停

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同 workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库 workerB 把没有处理的任务扫描出来放入消息队列 workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库 在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨 要点 1. worker接口 每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口 type Worker interface{ Start() Stop() } 2. worker的优雅启动和退出 每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用"sync.WaitGroup" 由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理 type WorkerManager struct { sync.WaitGroup // 保存所有worker WorkerSlice []Worker } WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。 - 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker - 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker func NewWorkerManager() *WorkerManager { workerManager := WorkerManager{} workerManager.WorkerSlice = make([]Worker, 0, 10) return &workerManager } func (wm *WorkerManager) AddWorker(w Worker) { wm.WorkerSlice = append(wm.WorkerSlice, w) } func (wm *WorkerManager) Start() { wm.Add(len(wm.WorkerSlice)) for _, worker := range wm.WorkerSlice { go func(w Worker) { defer func() { err := recover() // 注意需要recover if err != nil { fmt.Printf("WorkerManager error, error:%v, stack:%v\n", err, string(Stack())) } }() w.Start() }(worker) } } func (wm *WorkerManager) Stop() { for _, worker := range wm.WorkerSlice { go func(w Worker) { defer func() { err := recover() if err != nil { fmt.Printf("WorkerManager error, error:%v, stack:%v\n", err, string(Stack())) } }() w.Stop() wm.Done() }(worker) } } 3. 完整代码示例 github 地址: vearne/worker_manager ...

March 7, 2018 · 2 min

聊聊HttpCode 301和302 重定向

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 HTTP状态码 除了最常见的200,201,其实302应该是我们用的最多的,尤其是在Oauth实现中 简单说明 301 Moved Permanently 描述 状态码301表示永久重定向 场景 网址永久变更 302 Moved Temporarily 描述 状态码302表示临时重定向 场景 网址(资源)临时变更,特别是在Oauth、单点登录等过程中,被大量使用 过程说明 下面是我用python实现的http server的简单例子 redirect.py import time import tornado.ioloop import tornado.web class PermanentlyHandler(tornado.web.RequestHandler): def get(self): print '--301 Permanently--' self.redirect('http://vearne.cc/archives/365', permanent=True) class TemporarilyHandler(tornado.web.RequestHandler): def get(self): print '--302 Temporarily--' self.redirect('http://vearne.cc/archives/365', permanent=False) def make_app(): return tornado.web.Application([ (r"/abc", PermanentlyHandler), (r"/def", TemporarilyHandler), ]) if __name__ == "__main__": app = make_app() app.listen(8888) tornado.ioloop.IOLoop.current().start() 可以直接使用浏览器模拟请求 让我们来观察一下301和302请求的实际返回 ...

March 2, 2018 · 2 min

聊聊resolv.conf

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 在记一次使用阿里云REDIS出现的故障一文,中我记录了由于DNS解析, 导致无法访问阿里云redis的服务地址, 里面涉及了/etc/resolv.conf, 通过修改resolv.conf可以修改系统默认的DNS服务器 说明 关于resolv.conf其他参数,请阅读参考资料2 常见配置1 通常我们会这么配置 nameserver 119.29.29.29 nameserver 223.5.5.5 nameserver 114.114.114.114 nameserver DNS服务器的地址 在常见配置1中,系统顺序尝试每个dns服务器的地址。如果响应超时(默认为5秒),则尝试下一个地址 常见配置2 另外云主机通常是这样配置的 options timeout:1 attempts:1 rotate nameserver 10.143.22.116 nameserver 10.143.22.118 timeout 请求dns服务器超时设置,单位:秒,默认为5秒,具体值需要查 resolv.h attempts 每个dns服务器的重试次数 rotate 使用轮训的方式选择DNS服务器,而不是顺序尝试 参考资料 DNS域名解析过程 resolv.conf 请我喝瓶饮料

February 27, 2018 · 1 min

记一次使用阿里云Redis出现的故障

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 我们公司的很多服务都是完全部署在阿里云上,甚至包括很多数据库。比如Redis和MySQL 事件的经过 阿里云的Redis为了方便内网访问(其实也只能在内网访问)使用的域名形如 xxxx.redis.rds.aliyuncs.com 时间 在20xx-xx-xx 23:00 ~ 24:00 症状 发现某个服务无法访问 继而排查日志发现,我们的服务会访问Redis,但是Redis的这个域名无法解析了;在没有做任何操作的情况下,域名在24点后可以正常解析了,服务也就自然的恢复了。 排查 对比这台机器和其它机器,发现他们的DNS的配置文件风格不大像 故障机器 [root@hostA ~]$ cat /etc/resolv.conf nameserver 119.29.29.29 nameserver 223.5.5.5 nameserver 114.114.114.114 其它阿里云机器 [root@hostB ~]#cat /etc/resolv.conf options timeout:1 attempts:1 rotate nameserver 10.143.22.116 nameserver 10.143.22.118 hostA的DNS文件显然是有运维兄弟修改过了,在正常情况下,hostA这样配置不会有什么问题 但是在某些特定情况下,可能会引发灾难 极端情况 参考资料1,描述了DNS解析的整个过程 比如我们使用119.29.29.29(DNSPOD,其实DNSPOD早就被腾讯收购了)作为Local DNS,图中的Name Server其实是域名的权威服务器,aliyuncs.com的权威服务假定是ns4.aliyun.com,明显这是阿里云的机器 极端情况1 我是假定步骤8的网络出现了异常(DNSPOD -> aliyun) 显然在这种情况下,在hostA已经不可能成功解析域名 获得域名权威服务器信息可用以下方法 AUTHORITY SECTION 即为域名的权威服务器信息 ╰─$ dig aliyuncs.com ; <<>> DiG 9.9.3 <<>> aliyuncs.com ;; global options: +cmd ;; Got answer: ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 64838 ;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 3, ADDITIONAL: 7 ;; OPT PSEUDOSECTION: ; EDNS: version: 0, flags:; udp: 1280 ;; QUESTION SECTION: ;aliyuncs.com. IN A ;; ANSWER SECTION: aliyuncs.com. 300 IN A 140.205.32.8 ;; AUTHORITY SECTION: aliyuncs.com. 99637 IN NS ns4.aliyun.com. aliyuncs.com. 99637 IN NS ns3.aliyun.com. aliyuncs.com. 99637 IN NS ns5.aliyun.com. 极端情况2 如果阿里云主机到DNSPod(119.29.29.29) 出现网络故障,显然域名也就无法解析了 ...

February 25, 2018 · 1 min

用golang实现的定时器

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 我们有一个业务场景(高并发),收到的每个任务都必须在很短的时间内完成,然后回报给调用方,因此每个任务都需要设定定时器。显然golang 默认的timer是不能支持这种场景的,因此我在delayqueue的基础上自己实现了一个库gtimer GitHub地址 timer 用golang实现的定时器,基于delayqueue 实现 实现受到了Java DelayQueue.java的启发 源码地址 DelayQueue.java 依赖的几个结构依次为为 timer -> delayqueue -> priorityqueue -> heap 由于golang的Condition不支持wait一段时间,所以使用golang原生的Timer来替代了Condition在delayqueue中的作用 Installation Install: go get -u github.com/vearne/gtimer Import: import &quot;github.com/vearne/gtimer&quot; Quick Start package main import ( &quot;fmt&quot; log &quot;github.com/sirupsen/logrus&quot; &quot;github.com/vearne/gtimer&quot; &quot;math/rand&quot; &quot;strconv&quot; &quot;sync/atomic&quot; &quot;time&quot; ) const ( PRODUCER_COUNT = 10 CONSUMER_COUNT = 10 TARGET_COUNT = 1000000 ) var ops int64 = 0 func main() { st := gtimer.NewSuperTimer(CONSUMER_COUNT) t1 := time.Now() for i := 0; i &lt; PRODUCER_COUNT; i++ { go push(st, &quot;worker&quot;+strconv.Itoa(i)) } time.Sleep(100 * time.Millisecond) for { v := atomic.LoadInt64(&amp;ops) if v &gt;= TARGET_COUNT { st.Stop() break } else { time.Sleep(100 * time.Millisecond) } } t2 := time.Now() log.Infof(&quot;cost:%v\n&quot;, t2.Sub(t1)) } func DefaultAction(t time.Time, value string) { // fmt.Printf(&quot;trigger_time:%v, value:%v\n&quot;, t, value) atomic.AddInt64(&amp;ops, 1) } func push(timer *gtimer.SuperTimer, name string) { r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i &lt; 1000000; i++ { now := time.Now() t := now.Add(time.Millisecond * time.Duration(r.Int63n(300))) value := fmt.Sprintf(&quot;%v:value:%v&quot;, name, strconv.Itoa(i)) // create a delayed task item := gtimer.NewDelayedItemFunc(t, value, DefaultAction) timer.Add(item) } } use NewDelayedItemFunc, we can create a task ...

February 13, 2018 · 2 min

查找二叉树的最近公共祖先

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 题目 查找二叉树的最近公共祖先 示例如下: 节点6和节点8的最近公共祖先是节点4 节点2和节点5的最近公共祖先也是节点0 节点4和节点7的最近公共祖先也是节点4 思路 1)采用中序遍历(先左子树-> 根 -> 右子树) 可以得到每个节点在树中的层级 2)扫描2个目标节点之间所有节点,层级最小的节点,即为最近公共祖先 ancestor.cpp #include &lt;iostream&gt; #include &lt;stack&gt; #include &lt;vector&gt; using namespace std; #define MAX_VALUE 1000000 struct node{ int data; struct node* lchild; struct node* rchild; }; typedef struct node Node; int levels[100]; int lca(Node* root,int m,int n){ Node* p = root; vector&lt;int&gt; v; stack&lt;Node*&gt; s; int level = 0; cout&lt;&lt;&quot;中序遍历:&quot;&lt;&lt;endl; while(p||!s.empty()){ if(p){ s.push(p); levels[p-&gt;data]=level; p = p-&gt;lchild; level++; }else{ p = s.top(); level = levels[p-&gt;data]; s.pop(); cout&lt;&lt;p-&gt;data&lt;&lt;&quot; &quot;; v.push_back(p-&gt;data); p = p-&gt;rchild; level++; } } cout&lt;&lt;endl; cout&lt;&lt;&quot;节点对应层级:&quot;&lt;&lt;endl; for(int i=0;i&lt;v.size();i++){ cout&lt;&lt;levels[v[i]]&lt;&lt;&quot; &quot;; } cout&lt;&lt;endl; int minLevel = MAX_VALUE; int res = -1; bool flag = false; for(int i=0;i&lt;v.size();i++){ if(v[i]==m||v[i]==n){ if(levels[v[i]]&lt;minLevel){ res = v[i]; minLevel = levels[v[i]]; } flag = !flag; }else if(flag){ if(levels[v[i]]&lt;minLevel){ res = v[i]; minLevel = levels[v[i]]; } } } return res; } int main(){ Node node0={0,NULL,NULL}; Node node1={1,NULL,NULL}; Node node4={4,NULL,NULL}; Node node2={2,NULL,NULL}; Node node3={3,NULL,NULL}; Node node5={5,NULL,NULL}; Node node6={6,NULL,NULL}; Node node7={7,NULL,NULL}; Node node8={8,NULL,NULL}; node0.lchild = &amp;node1; node0.rchild = &amp;node4; node1.lchild = &amp;node2; node1.rchild = &amp;node3; node4.lchild = &amp;node5; node4.rchild = &amp;node8; node5.lchild = &amp;node6; node5.rchild = &amp;node7; int n = 0; n = lca(&amp;node0,6,8); cout&lt;&lt;&quot;节点6和节点8的最近公共祖先是&quot;&lt;&lt;n&lt;&lt;endl; n = lca(&amp;node0,2,5); cout&lt;&lt;&quot;节点2和节点5的最近公共祖先是&quot;&lt;&lt;n&lt;&lt;endl; n = lca(&amp;node0,4,7); cout&lt;&lt;&quot;节点4和节点7的最近公共祖先是&quot;&lt;&lt;n&lt;&lt;endl; } 输出 中序遍历: 2 1 3 0 6 5 7 4 8 节点对应层级: 2 1 2 0 3 2 3 1 2 节点6和节点8的最近公共祖先是4 中序遍历: 2 1 3 0 6 5 7 4 8 节点对应层级: 2 1 2 0 3 2 3 1 2 节点2和节点5的最近公共祖先是0 中序遍历: 2 1 3 0 6 5 7 4 8 节点对应层级: 2 1 2 0 3 2 3 1 2 节点4和节点7的最近公共祖先是4 PS 2012年写的练习小程序 ...

February 11, 2018 · 2 min

在文件内建立索引(分析IPIP的*.dat文件)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引子 一直比较好奇如何在文件中建立索引 一个机缘巧合我们公司在使用IPIP的IP数据库, 这个公司对外提供的离线文件 为*.dat, IP的查询实际就是对这个离线文件的检索过程,我觉得这个文件的结构,很能够说明我的主题,如何在文件中建立索引 文件结构 首先IP库存在目的,是为了能够查询某个IP对应的以下信息 { &quot;country&quot;: &quot;中国&quot;, &quot;region&quot;: &quot;北京&quot;, &quot;city&quot;: &quot;北京&quot;, &quot;isp&quot;: &quot;阿里云/电信/联通/移动/铁通/教育网&quot; } IP是以段来管理的,而不是单个IP 字段 类型 说明 备注 segment_start int IP段起始 由于ip是递增的,因此这个字段实际并不存在 segment_end int IP段结束 对于IPv4是4个字节 country string 国家 region string 省份 city string 城市 isp string 运营商 IP段的数量有限, 不超过2w 为了方便大家更好的理解文件结构,我画了一张图 说明 第1级索引直接使用IP的第1字节,因此最多256个,每个占4个字节 第2级索引,每个8个字节,其中前4个字节为segment_end,后4个字节中,前3个字节是是记录在文件中的偏移,最后一个字节,为记录的长度 检索 检索的过程,程序将整个文件加载并驻留到内存中,然后在内存中进行相应的操作 检索时间分析 由于IP段的数量有限(不超过2w), 在第1级索引的查找次数是1 在第2级索引的是顺序遍历的 平均需要遍历的索引条目条数为 20000/256 ≈ 80 索引条目是固定长度(8bytes),且文件已经提前加载到了内存中,因此速度还是很快的。 ...

February 8, 2018 · 1 min

基于version的MySQL并发无锁策略

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引子: 有这么一种场景,对于外部系统提交的任务,我们要把任务扫出来,推送到 消息队列中,然后消费者监听在消息队列上, 取到任务进行消费。要防止任务被重复消费,扫出的任务要修改对应数据库状态值。 问题 假定数据库表结构为 task 字段 类型 说明 备注 id int 主键 task_id int 任务ID status int 状态 0:等待中, 1:运行中, 2:成功, 3:失败 body string 任务body体 version string 为了区分写入成功的对象 我们知道把任务扫出来,至少需要执行3步操作 扫描出等待中的任务 select * from task where status = 0 limit 10; 2)将扫出的任务推送到消息队列中 3) 修改任务状态 假定扫描出的任务task_id 分别为为1、2、3 update task set status = 1 where task_id in (1,2,3) and status = 0; 显然这个过程不是原子的,如果同时有多个scanner进行操作,显然会任务可能被重复推入消息队列中 ...

January 29, 2018 · 2 min