版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1. 创建协程

创建协程示例

n.waitGroup.Wrap(func() {
    // do something
})

util.WaitGroupWrapper

package util

import (
	"sync"
)

type WaitGroupWrapper struct {
	sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
	w.Add(1)
	go func() {
		cb()
		w.Done()
	}()
}

2. 协程池

用于从Channel扫描数据到Client

func (n *NSQD) queueScanLoop() {
...
    	for {
		select {
		case <-workTicker.C:
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:
			channels = n.channels() 
            // 协程池中的协程的数量是动态变化的
            // 理想数量是与channel的数量保持一致
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
			goto exit
		}
...
}
// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 	1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	idealPoolSize := int(float64(num) * 0.25)
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		if idealPoolSize == n.poolSize {
			break
		} else if idealPoolSize < n.poolSize {
			// contract
			closeCh <- 1
			n.poolSize--
		} else {
			// expand
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

3. 通讯协议

nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同

3.1 指令型数据

指令型数据包含各种各样的 CMD

  • SUB 订阅
  • PUB 发布
  • DPUB 延迟发布
  • REQ 重入队列
  • FIN 消息已经被消费 使用这种命令模式,使得生产者、消费者与nsqd的交互逻辑变得更加易于理解
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	if bytes.Equal(params[0], []byte("IDENTIFY")) {
		return p.IDENTIFY(client, params)
	}
	err := enforceTLSPolicy(client, p, params[0])
	if err != nil {
		return nil, err
	}
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("RDY")):
		return p.RDY(client, params)
	case bytes.Equal(params[0], []byte("REQ")):
		return p.REQ(client, params)
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("MPUB")):
		return p.MPUB(client, params)
	case bytes.Equal(params[0], []byte("DPUB")):
		return p.DPUB(client, params)
    // ... 
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

3.2 消息型数据

消息推送时使用的结构

4. 消息的持久化逻辑

func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m:  // 写入到channel中(定义:chan *Message)中
	default:  // 如果写满zou
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(LOG_ERROR,
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}
  • 这里的backend只是一个接口,因此可以被容易的替换成其它实现,默认是写到磁盘中。
// BackendQueue represents the behavior for the secondary message
// storage system
type BackendQueue interface {
	Put([]byte) error
	ReadChan() chan []byte // this is expected to be an *unbuffered* channel
	Close() error
	Delete() error
	Depth() int64
	Empty() error
}

如果把mem_queue_size设为0,强制所有消息要落地到硬盘,那么nsqd的吞吐能力就会大大下降,笔者认为这不是nsq设计的初衷。对消息丢失敏感的应用,对nsq要慎用。


请我喝瓶饮料

微信支付码