玩转NSQ(3)-漂亮的代码实现
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
1. 创建协程
创建协程示例
n.waitGroup.Wrap(func() {
// do something
})
util.WaitGroupWrapper
package util
import (
"sync"
)
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
2. 协程池
用于从Channel
扫描数据到Client
func (n *NSQD) queueScanLoop() {
...
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
// 协程池中的协程的数量是动态变化的
// 理想数量是与channel的数量保持一致
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
...
}
// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// contract
closeCh <- 1
n.poolSize--
} else {
// expand
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
3. 通讯协议
nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同
3.1 指令型数据
指令型数据包含各种各样的 CMD
SUB
订阅PUB
发布DPUB
延迟发布REQ
重入队列FIN
消息已经被消费
使用这种命令模式,使得生产者、消费者与nsqd的交互逻辑变得更加易于理解
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
// ...
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
3.2 消息型数据
消息推送时使用的结构
4. 消息的持久化逻辑
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m: // 写入到channel中(定义:chan *Message)中
default: // 如果写满zou
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
- 这里的backend只是一个接口,因此可以被容易的替换成其它实现,默认是写到磁盘中。
// BackendQueue represents the behavior for the secondary message
// storage system
type BackendQueue interface {
Put([]byte) error
ReadChan() chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Empty() error
}
如果把mem_queue_size设为0,强制所有消息要落地到硬盘,那么nsqd的吞吐能力就会大大下降,笔者认为这不是nsq设计的初衷。对消息丢失敏感的应用,对nsq要慎用。
resizePool 那部分,有个 if idealPoolSize == n.poolSize 就 break,而 idealPoolSize 是 channel 总数*0.25,应该每次变化不大吧,而 else 里面有个 n.poolSize++,只怕很快就会 idealPoolSize == n.poolSize 了,然后就每次 break 不执行了?想不通想不通,为什么这样呢?
1) nsq的中创建Channel是非常轻量,所以Channel的数量可能达到成千上万,因此调整协程池中协程的数量是有意义的。
2) resizePool 是周期性执行的,注意下面的代码,每次调整都试图达到理想状态