redigo提示connection pool exhausted

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 引言 线上的某个服务(Golang开发)使用Redis作为消息队列,使用的redis库是garyburd/redigo, 这两天出现如下错误 connection pool exhausted 2. 产生原因 阅读源码pool.go 阅读get()即可 type Pool struct { // Dial()方法返回一个连接,从在需要创建连接到的时候调用 Dial func() (Conn, error) // TestOnBorrow()方法是一个可选项,该方法用来诊断一个连接的健康状态 TestOnBorrow func(c Conn, t time.Time) error // 最大空闲连接数 MaxIdle int // 一个pool所能分配的最大的连接数目 // 当设置成0的时候,该pool连接数没有限制 MaxActive int // 空闲连接超时时间,超过超时时间的空闲连接会被关闭。 // 如果设置成0,空闲连接将不会被关闭 // 应该设置一个比redis服务端超时时间更短的时间 IdleTimeout time.Duration // 如果Wait被设置成true,则Get()方法将会阻塞 Wait bool ... ... } 以上异常的原因是这样的,在garyburd/redigo中,得到连接的步骤如下 尝试从空闲列表中,获得一个可用连接;如果成功直接返回,失败则尝试步骤2 如果当前的Active 连接数 < MaxActive,则尝试创建一个新连接;如果成功直接返回,失败则尝试步骤3 判断Wait为true则等待,否则报出异常 // ErrPoolExhausted is returned from a pool connection method (Do, Send, // Receive, Flush, Err) when the maximum number of database connections in the // pool has been reached. var ErrPoolExhausted = errors.New(&quot;redigo: connection pool exhausted&quot;) 3. 解决方法 设置MaxActive,设MaxActive=0(表示无限大)或者足够大。 设置Wait=true,当程序执行get(),无法获得可用连接时,将会暂时阻塞。 4. 完整的初始化连接池的代码 func NewRedisPool(redisConf context.RedisConf) *redis.Pool { address := fmt.Sprintf(&quot;%v:%v&quot;, redisConf.Host, redisConf.Port) dbOption := redis.DialDatabase(redisConf.Db) pwOption := redis.DialPassword(redisConf.Password) // **重要** 设置读写超时 readTimeout := redis.DialReadTimeout(time.Second * time.Duration(redisConf.ConTimeout)) writeTimeout := redis.DialWriteTimeout(time.Second * time.Duration(redisConf.ConTimeout)) conTimeout := redis.DialConnectTimeout(time.Second * time.Duration(redisConf.ConTimeout)) redisPool := &amp;redis.Pool{ // 从配置文件获取maxidle以及maxactive,取不到则用后面的默认值 MaxIdle: redisConf.MaxIdleConn, MaxActive: redisConf.MaxActiveConn, // **重要** 如果空闲列表中没有可用的连接 // 且当前Active连接数 &lt; MaxActive // 则等待 Wait: true, IdleTimeout: time.Duration(redisConf.IdleTimeout) * time.Second, Dial: func() (redis.Conn, error) { c, err := redis.Dial(&quot;tcp&quot;, address, dbOption, pwOption, readTimeout, writeTimeout, conTimeout) if err != nil { return nil, err } return c, nil }, } return redisPool }

June 4, 2018 · 1 min

推荐点golang入门资料

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 一个大学同学在微信上发消息问我,有没有Golang入门的资料 1. 书籍 Golang入门书的话,对于有一定开发经验的我觉得 beego的作者的谢孟军 电子书地址 书名:《Go Web 编程》 推荐理由: golang入门书籍。作者是beego的作者的谢孟军,他的golang入门介绍很实用,并且谈了不少web开发中的技术要点 2. 开源项目 使用Golang很大程度就是为了高性能,所以必须要对性能这块格外关注 这里推荐几个开源项目 2.1 fasthttp valyala/fasthttp 高性能web框架,了解web框架的不二出路,号称相比官方库有10倍性能提升 关键词: 协程池 2.2 nsq nsqio/nsq 分布式的消息分发平台 多种worker内存中围绕着channel来工作 注意:这基本是大量Golang服务的常态,所以这个项目要特别关注 关键词: channel 2.3 open-falcon open-falcon 小米开源的监控系统 open-falcon里面保罗万象, 像一个小型的生态系统 open-falcon中多个系统的交互,大量使用RPC,另外open-falcon中各个组件的启停管理方式值得借鉴。 关键词: rpc heartbeat 3. 视频资源 Gopher大会视频 请我喝瓶饮料

May 4, 2018 · 1 min

Golang中赋值会导致结构体复制

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 Golang很多语法特征都和C++非常相似, stuct对象的赋值操作,会导致struct被copy(浅拷贝) test_copy.go package main import &quot;fmt&quot; type Car struct{ Name string Age int XList []int } func main(){ a := Car{Name:&quot;buick&quot;, Age: 10, XList:make([]int, 10)} b := a b.Age = 9 fmt.Printf(&quot;address a:%p\n&quot;, &amp;a) fmt.Printf(&quot;address b:%p\n&quot;, &amp;b) fmt.Println(&quot;----------------------&quot;) fmt.Printf(&quot;Car a:%v\n&quot;, a) fmt.Printf(&quot;Car a:%v\n&quot;, len(a.XList)) fmt.Printf(&quot;Car b:%v\n&quot;, b) fmt.Printf(&quot;Car b:%v\n&quot;, len(b.XList)) } 输出 address a:0xc420068180 address b:0xc4200681b0 ---------------------- Car a:{buick 10 [0 0 0 0 0 0 0 0 0 0]} Car a:10 Car b:{buick 9 [0 0 0 0 0 0 0 0 0 0]} Car b:10 可以看出a和b已经是完全不同的对象,对b的修改不会影响a ...

April 28, 2018 · 1 min

imroc/req 连接池使用须知

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 前言 imroc/req 作为目前最好用的request库深受Gopher的喜爱,但是它的连接池,在使用时仍然有些要注意的事项。 2. 简述 imroc/req内部仍然使用的是标准库net/http来管理连接 所以我们首先要理解net/http 是如何管理连接池的 transport.go type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns idleConn map[connectMethodKey][]*persistConn // most recently used at end idleConnCh map[connectMethodKey]chan *persistConn // MaxIdleConns controls the maximum number of idle (keep-alive) // connections across all hosts. Zero means no limit. MaxIdleConns int // MaxIdleConnsPerHost, if non-zero, controls the maximum idle // (keep-alive) connections to keep per-host. If zero, // DefaultMaxIdleConnsPerHost is used. MaxIdleConnsPerHost int 这里的 connectMethodKey可以用来标识一个目标 显然它是proxy,scheme, addr 构成的三元组 MaxIdleConnsPerHost限制的是相同connectMethodKey的空闲连接数量 DefaultMaxIdleConnsPerHost的默认值是2,这对一个大并发的场景是完全不够用的。 ...

April 20, 2018 · 3 min

CHANNEL在GOLANG中的有趣用法(2)-对象池

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 Golang常常被用在对性能有较高需求的场景。为了避免高频的创建对象和GC我们,需要使用对象池,可以使用它默认提供的sync.pool。 但是每次gc时,sync.pool中的cache的对象都会被释放,如果我们对性能的要求更高,且需求更加明确,我们可以使用自定义的对象池 实现 下面对象池的简易实现,大家可以参考 package main import ( &quot;fmt&quot; ) type Car struct { name string age int } type CarPool struct { Cached chan *Car Size int } func NewCarPool(size int) *CarPool { x := CarPool{} x.Cached = make(chan *Car, size) x.Size = size return &amp;x } func (c *CarPool) Get() *Car { var res *Car select { case res = &lt;-c.Cached: fmt.Println(&quot;---get--&quot;) default: fmt.Println(&quot;---create one--&quot;) res = &amp;Car{} } return res } func (p *CarPool) Put(c *Car) { select { case p.Cached &lt;- c: fmt.Println(&quot;---put--&quot;) default: c = nil fmt.Println(&quot;---destroy--&quot;) } } func main() { carPool := NewCarPool(3) for i := 0; i &lt; 5; i++ { x := carPool.Get() carPool.Put(x) } } 参考资料: 1.广发证券Go在证券行情系统中的应用 ...

April 3, 2018 · 1 min

Channel在Golang中的有趣用法(1)-channel实现非阻塞队列

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 Channel是Golang中非常重要的数据结构, 默认它是阻塞的。那么如何实现一个非阻塞队列呢,你可以参考我的实现。 实现1 package main import ( &quot;errors&quot; &quot;fmt&quot; ) func push(q chan int, item int) error { select { case q &lt;- item: return nil default: return errors.New(&quot;queue full&quot;) } } func get(q chan int) (int, error) { var item int select { case item = &lt;-q: return item, nil default: return 0, errors.New(&quot;queue empty&quot;) } } func main() { q := make(chan int, 5) x := []int{1, 2, 3, 4, 5, 6} for _, value := range x { err := push(q, value) fmt.Printf(&quot;error:%v\n&quot;, err) } for _, value := range x { fmt.Println(value) v, err := get(q) fmt.Printf(&quot;v:%v, error:%v\n&quot;, v, err) } } 实现2 我们还可以把channel变成一个带超时的阻塞队列 ...

March 26, 2018 · 2 min

golang基于观察者模式管理多种worker的启停

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同 workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库 workerB 把没有处理的任务扫描出来放入消息队列 workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库 在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨 要点 1. worker接口 每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口 type Worker interface{ Start() Stop() } 2. worker的优雅启动和退出 每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用"sync.WaitGroup" 由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理 type WorkerManager struct { sync.WaitGroup // 保存所有worker WorkerSlice []Worker } WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。 - 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker - 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker func NewWorkerManager() *WorkerManager { workerManager := WorkerManager{} workerManager.WorkerSlice = make([]Worker, 0, 10) return &amp;workerManager } func (wm *WorkerManager) AddWorker(w Worker) { wm.WorkerSlice = append(wm.WorkerSlice, w) } func (wm *WorkerManager) Start() { wm.Add(len(wm.WorkerSlice)) for _, worker := range wm.WorkerSlice { go func(w Worker) { defer func() { err := recover() // 注意需要recover if err != nil { fmt.Printf(&quot;WorkerManager error, error:%v, stack:%v\n&quot;, err, string(Stack())) } }() w.Start() }(worker) } } func (wm *WorkerManager) Stop() { for _, worker := range wm.WorkerSlice { go func(w Worker) { defer func() { err := recover() if err != nil { fmt.Printf(&quot;WorkerManager error, error:%v, stack:%v\n&quot;, err, string(Stack())) } }() w.Stop() wm.Done() }(worker) } } 3. 完整代码示例 github 地址: vearne/worker_manager ...

March 7, 2018 · 2 min

用golang实现的定时器

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 我们有一个业务场景(高并发),收到的每个任务都必须在很短的时间内完成,然后回报给调用方,因此每个任务都需要设定定时器。显然golang 默认的timer是不能支持这种场景的,因此我在delayqueue的基础上自己实现了一个库gtimer GitHub地址 timer 用golang实现的定时器,基于delayqueue 实现 实现受到了Java DelayQueue.java的启发 源码地址 DelayQueue.java 依赖的几个结构依次为为 timer -> delayqueue -> priorityqueue -> heap 由于golang的Condition不支持wait一段时间,所以使用golang原生的Timer来替代了Condition在delayqueue中的作用 Installation Install: go get -u github.com/vearne/gtimer Import: import &quot;github.com/vearne/gtimer&quot; Quick Start package main import ( &quot;fmt&quot; log &quot;github.com/sirupsen/logrus&quot; &quot;github.com/vearne/gtimer&quot; &quot;math/rand&quot; &quot;strconv&quot; &quot;sync/atomic&quot; &quot;time&quot; ) const ( PRODUCER_COUNT = 10 CONSUMER_COUNT = 10 TARGET_COUNT = 1000000 ) var ops int64 = 0 func main() { st := gtimer.NewSuperTimer(CONSUMER_COUNT) t1 := time.Now() for i := 0; i &lt; PRODUCER_COUNT; i++ { go push(st, &quot;worker&quot;+strconv.Itoa(i)) } time.Sleep(100 * time.Millisecond) for { v := atomic.LoadInt64(&amp;ops) if v &gt;= TARGET_COUNT { st.Stop() break } else { time.Sleep(100 * time.Millisecond) } } t2 := time.Now() log.Infof(&quot;cost:%v\n&quot;, t2.Sub(t1)) } func DefaultAction(t time.Time, value string) { // fmt.Printf(&quot;trigger_time:%v, value:%v\n&quot;, t, value) atomic.AddInt64(&amp;ops, 1) } func push(timer *gtimer.SuperTimer, name string) { r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i &lt; 1000000; i++ { now := time.Now() t := now.Add(time.Millisecond * time.Duration(r.Int63n(300))) value := fmt.Sprintf(&quot;%v:value:%v&quot;, name, strconv.Itoa(i)) // create a delayed task item := gtimer.NewDelayedItemFunc(t, value, DefaultAction) timer.Add(item) } } use NewDelayedItemFunc, we can create a task ...

February 13, 2018 · 2 min

golang fasthttp优雅退出

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 使用Golang进行开发服务,我们会特别关心性能,fasthttp是标准库net/http性能的10倍。所以我们的web服务大量使用它。不过这个库并没有提供优雅退出方案, 查看了很多资料,我找到一个解决方案。 1.原理 对于web服务而言,优雅退出其实分为2步 1)关闭监听的端口,不再接受新的请求 2)对于正在处理的请求等待它们处理完成,如果全部处理完成,或者默认的超时时间已达,则退出 2. 实现 要达到这个目标需要实现 type Listener interface 下面展示一部分核心代码 grace_listener.go type GracefulListener struct { // inner listener ln net.Listener // maximum wait time for graceful shutdown maxWaitTime time.Duration // this channel is closed during graceful shutdown on zero open connections. done chan struct{} // the number of open connections connsCount uint64 // becomes non-zero when graceful shutdown starts shutdown uint64 } // Close closes the inner listener and waits until all the pending open connections // are closed before returning. func (ln *GracefulListener) Close() error { // 不再接受新的请求 err := ln.ln.Close() if err != nil { return nil } // 等待已经接到的请求处理完成 return ln.waitForZeroConns() } func (ln *GracefulListener) waitForZeroConns() error { atomic.AddUint64(&amp;ln.shutdown, 1) fmt.Println(&quot;waitForZeroConns&quot;, atomic.LoadUint64(&amp;ln.connsCount)) if atomic.LoadUint64(&amp;ln.connsCount) == 0 { close(ln.done) return nil } select { case &lt;-ln.done: return nil case &lt;-time.After(ln.maxWaitTime): return fmt.Errorf(&quot;cannot complete graceful shutdown in %s&quot;, ln.maxWaitTime) } return nil } server.go 初始化&启动web服务 ...

January 18, 2018 · 1 min

golang批量Ping的库

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:公司有个项目需要批量检查多个IP的网络质量,其中一个指标是丢包率,当然用ping是最合适的方法,但是使用ping命令性能太差,Golang又没有批量ping的包 开发: 目前github只有sparrc写的ping库,不过他的库不支持批量探测多个IP https://github.com/sparrc/go-ping 并且sparrc/go-ping在使用时,如果对多个IP同时探测会出现串包的问题,请谨慎使用。 再他的启发下,我做了扩展,支持批量请求 安装 go get github.com/vearne/go-ping 使用 ipSlice := []string{} ipSlice = append(ipSlice, &quot;123.126.157.222&quot;) ipSlice = append(ipSlice, &quot;wwww.baidu.com&quot;) ipSlice = append(ipSlice, &quot;github.com&quot;) // 对每个地址,共发出4个探测包,每隔1秒发一个, 总超时6秒 bp, err := ping.NewBatchPinger(ipSlice, 4, time.Second*1, time.Second*6) if err != nil { fmt.Println(err) } // 收到ICMP answer时的回调函数 bp.OnRecv = func(pkt *icmp.Echo) { // fmt.Printf(&quot;recv icmp_id=%d, icmp_seq=%d\n&quot;, pkt.ID, pkt.Seq) } // 所有Ping结束时的回调函数 bp.OnFinish = func(stSlice []*ping.Statistics) { for _, st := range stSlice{ fmt.Printf(&quot;\n--- %s ping statistics ---\n&quot;, st.Addr) fmt.Printf(&quot;%d packets transmitted, %d packets received, %v%% packet loss\n&quot;, st.PacketsSent, st.PacketsRecv, st.PacketLoss) fmt.Printf(&quot;round-trip min/avg/max/stddev = %v/%v/%v/%v\n&quot;, st.MinRtt, st.AvgRtt, st.MaxRtt, st.StdDevRtt) } } bp.Run() 完整的例子 https://github.com/vearne/go-ping/blob/master/cmd/ping/ping2.go ...

January 1, 2018 · 1 min