玩转NSQ(3)-漂亮的代码实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 创建协程 创建协程示例 n.waitGroup.Wrap(func() { // do something }) util.WaitGroupWrapper package util import ( "sync" ) type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } 2. 协程池 用于从Channel扫描数据到Client func (n *NSQD) queueScanLoop() { ... for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() // 协程池中的协程的数量是动态变化的 // 理想数量是与channel的数量保持一致 n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } ... } // resizePool adjusts the size of the pool of queueScanWorker goroutines // // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax) // func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract closeCh <- 1 n.poolSize-- } else { // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } } 3. 通讯协议 nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同 ...

October 9, 2019 · 2 min

玩转NSQ(2)-消息流转

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1.引言 nsq的架构简单,代码清晰。对于自主造轮子,实现消息队列、消息推送系统、IM都是非常好的参考。 本文将以图表的形式来说明消息在nsdd中的流转。 2.消息流转 2.1 重要的数据结构 Topic/ Channel/ Client都是nsqd中的数据结构。 数据会从Topic复制到Channel1和Channel2 Client是 consumer在nsqd内部的表征 每个Client最多只能订阅一个Channel 它们内部都有一组queue memoryMsgChan chan *Message // 位于内存 backend BackendQueue //当memoryMsgChan写满,则默认写入磁盘 每个producer都有1个读协程,负责把producer发送的消息写入Topic 多个Client可以订阅同一个Channel。即一个Channel,多个消费者,谁抢到算谁的。 每一个Topic都对应1个协程Topic.messagePump负责从Topic复制数据到Channel NSQD.queueScanLoop会控制一组协程NSQD.queueScanWorker(动态大小的协程池) 从Channel(所有topic的Channels)中复制数据到Client中, 之所以是协程池,我觉得可能跟Channel中的消息有延迟发送,且有重入队列的有关操作 每一个Client对应着一个协程protocolV2.messagePump,负责通过TCP连接把数据发送给consumer 3. 其它有趣的小细节 比较有意思的是,nsq官方推荐,nsqd随着发布者一起部署。 发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。 实际上解放了producer,而甩锅给了consumer,如果某个Topic 假定叫topic1。如果topic1位于多个nsqd,consumer需要通过nsqlookupd获知所有拥topic1的nsqd的地址,然后需要在多个nsqd上订阅topic1 这里的nsqlookupd相当于是注册中心。 如果某个nsqd宕机,由于nsqd没有副本,消息可能会丢失 打赏作者

October 9, 2019 · 1 min