Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

1. 创建协程

创建协程示例

n.waitGroup.Wrap(func() {
    // do something
})

util.WaitGroupWrapper

package util

import (
    "sync"
)

type WaitGroupWrapper struct {
    sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

2. 协程池

用于从Channel扫描数据到Client

func (n *NSQD) queueScanLoop() {
...
        for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            channels = n.channels() 
            // 协程池中的协程的数量是动态变化的
            // 理想数量是与channel的数量保持一致
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }
...
}
// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
//  1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

3. 通讯协议

nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同

3.1 指令型数据

指令型数据包含各种各样的 CMD

  • SUB 订阅
  • PUB 发布
  • DPUB 延迟发布
  • REQ 重入队列
  • FIN 消息已经被消费
    使用这种命令模式,使得生产者、消费者与nsqd的交互逻辑变得更加易于理解
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    if bytes.Equal(params[0], []byte("IDENTIFY")) {
        return p.IDENTIFY(client, params)
    }
    err := enforceTLSPolicy(client, p, params[0])
    if err != nil {
        return nil, err
    }
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
        return p.FIN(client, params)
    case bytes.Equal(params[0], []byte("RDY")):
        return p.RDY(client, params)
    case bytes.Equal(params[0], []byte("REQ")):
        return p.REQ(client, params)
    case bytes.Equal(params[0], []byte("PUB")):
        return p.PUB(client, params)
    case bytes.Equal(params[0], []byte("MPUB")):
        return p.MPUB(client, params)
    case bytes.Equal(params[0], []byte("DPUB")):
        return p.DPUB(client, params)
    // ... 
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

3.2 消息型数据

消息推送时使用的结构

4. 消息的持久化逻辑

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:  // 写入到channel中(定义:chan *Message)中
    default:  // 如果写满zou
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}
  • 这里的backend只是一个接口,因此可以被容易的替换成其它实现,默认是写到磁盘中。
// BackendQueue represents the behavior for the secondary message
// storage system
type BackendQueue interface {
    Put([]byte) error
    ReadChan() chan []byte // this is expected to be an *unbuffered* channel
    Close() error
    Delete() error
    Depth() int64
    Empty() error
}

如果把mem_queue_size设为0,强制所有消息要落地到硬盘,那么nsqd的吞吐能力就会大大下降,笔者认为这不是nsq设计的初衷。对消息丢失敏感的应用,对nsq要慎用。


请我喝瓶饮料

微信支付码

2 对 “玩转NSQ(3)-漂亮的代码实现”的想法;

  1. resizePool 那部分,有个 if idealPoolSize == n.poolSize 就 break,而 idealPoolSize 是 channel 总数*0.25,应该每次变化不大吧,而 else 里面有个 n.poolSize++,只怕很快就会 idealPoolSize == n.poolSize 了,然后就每次 break 不执行了?想不通想不通,为什么这样呢?

    1. 1) nsq的中创建Channel是非常轻量,所以Channel的数量可能达到成千上万,因此调整协程池中协程的数量是有意义的。
      2) resizePool 是周期性执行的,注意下面的代码,每次调整都试图达到理想状态

      func (n *NSQD) queueScanLoop() {
      ...
              for {
              select {
              case <-workTicker.C:
                  if len(channels) == 0 {
                      continue
                  }
              case <-refreshTicker.C:  // 注意看这里
                  channels = n.channels() 
                  // 协程池中的协程的数量是动态变化的
                  // 理想数量是与channel的数量保持一致
                  n.resizePool(len(channels), workCh, responseCh, closeCh)
                  continue
              case <-n.exitChan:
                  goto exit
              }
      ...
      }
      

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据