玩转NSQ(3)-漂亮的代码实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 创建协程 创建协程示例 n.waitGroup.Wrap(func() { // do something }) util.WaitGroupWrapper package util import ( "sync" ) type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } 2. 协程池 用于从Channel扫描数据到Client func (n *NSQD) queueScanLoop() { ... for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() // 协程池中的协程的数量是动态变化的 // 理想数量是与channel的数量保持一致 n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } ... } // resizePool adjusts the size of the pool of queueScanWorker goroutines // // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax) // func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract closeCh <- 1 n.poolSize-- } else { // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } } 3. 通讯协议 nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同 ...

October 9, 2019 · 2 min