channel的有趣用法

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引子 萌叔在阅读tailsamplingprocessor源码时,发现channel的一种有趣的玩法,这里记录一下 FIFO 队列 tailsamplingprocessor中通过NumTraces 设置内存中最多保存的trace的数量, 超过这个阈值,就从按照先进先出的原则,删除最先进入队列的trace // NumTraces is the number of traces kept on memory. Typically most of the data // of a trace is released after a sampling decision is taken. NumTraces uint64 `mapstructure:"num_traces"` processor.go postDeletion := false currTime := time.Now() for !postDeletion { select { case tsp.deleteChan <- id: postDeletion = true default: traceKeyToDrop := <-tsp.deleteChan tsp.dropTrace(traceKeyToDrop, currTime) } } channel刚好满足这个特性, 如果tsp.deleteChan 没有满, 则往tsp.deleteChan写入一个traceID; 如果tsp.deleteChan已经装满,则从队列(tsp.deleteChan)中取出头部元素, 其实就是最先进入队列的traceID,将其对应的trace信息从内存中删除(tsp.dropTrace())。 ...

April 17, 2024 · 1 min

玩转NSQ(2)-消息流转

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1.引言 nsq的架构简单,代码清晰。对于自主造轮子,实现消息队列、消息推送系统、IM都是非常好的参考。 本文将以图表的形式来说明消息在nsdd中的流转。 2.消息流转 2.1 重要的数据结构 Topic/ Channel/ Client都是nsqd中的数据结构。 数据会从Topic复制到Channel1和Channel2 Client是 consumer在nsqd内部的表征 每个Client最多只能订阅一个Channel 它们内部都有一组queue memoryMsgChan chan *Message // 位于内存 backend BackendQueue //当memoryMsgChan写满,则默认写入磁盘 每个producer都有1个读协程,负责把producer发送的消息写入Topic 多个Client可以订阅同一个Channel。即一个Channel,多个消费者,谁抢到算谁的。 每一个Topic都对应1个协程Topic.messagePump负责从Topic复制数据到Channel NSQD.queueScanLoop会控制一组协程NSQD.queueScanWorker(动态大小的协程池) 从Channel(所有topic的Channels)中复制数据到Client中, 之所以是协程池,我觉得可能跟Channel中的消息有延迟发送,且有重入队列的有关操作 每一个Client对应着一个协程protocolV2.messagePump,负责通过TCP连接把数据发送给consumer 3. 其它有趣的小细节 比较有意思的是,nsq官方推荐,nsqd随着发布者一起部署。 发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。 实际上解放了producer,而甩锅给了consumer,如果某个Topic 假定叫topic1。如果topic1位于多个nsqd,consumer需要通过nsqlookupd获知所有拥topic1的nsqd的地址,然后需要在多个nsqd上订阅topic1 这里的nsqlookupd相当于是注册中心。 如果某个nsqd宕机,由于nsqd没有副本,消息可能会丢失 打赏作者

October 9, 2019 · 1 min

CHANNEL在GOLANG中的有趣用法(2)-对象池

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 Golang常常被用在对性能有较高需求的场景。为了避免高频的创建对象和GC我们,需要使用对象池,可以使用它默认提供的sync.pool。 但是每次gc时,sync.pool中的cache的对象都会被释放,如果我们对性能的要求更高,且需求更加明确,我们可以使用自定义的对象池 实现 下面对象池的简易实现,大家可以参考 package main import ( "fmt" ) type Car struct { name string age int } type CarPool struct { Cached chan *Car Size int } func NewCarPool(size int) *CarPool { x := CarPool{} x.Cached = make(chan *Car, size) x.Size = size return &x } func (c *CarPool) Get() *Car { var res *Car select { case res = <-c.Cached: fmt.Println("---get--") default: fmt.Println("---create one--") res = &Car{} } return res } func (p *CarPool) Put(c *Car) { select { case p.Cached <- c: fmt.Println("---put--") default: c = nil fmt.Println("---destroy--") } } func main() { carPool := NewCarPool(3) for i := 0; i < 5; i++ { x := carPool.Get() carPool.Put(x) } } 参考资料: 1.广发证券Go在证券行情系统中的应用 ...

April 3, 2018 · 1 min

Channel在Golang中的有趣用法(1)-channel实现非阻塞队列

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 Channel是Golang中非常重要的数据结构, 默认它是阻塞的。那么如何实现一个非阻塞队列呢,你可以参考我的实现。 实现1 package main import ( "errors" "fmt" ) func push(q chan int, item int) error { select { case q <- item: return nil default: return errors.New("queue full") } } func get(q chan int) (int, error) { var item int select { case item = <-q: return item, nil default: return 0, errors.New("queue empty") } } func main() { q := make(chan int, 5) x := []int{1, 2, 3, 4, 5, 6} for _, value := range x { err := push(q, value) fmt.Printf("error:%v\n", err) } for _, value := range x { fmt.Println(value) v, err := get(q) fmt.Printf("v:%v, error:%v\n", v, err) } } 实现2 我们还可以把channel变成一个带超时的阻塞队列 ...

March 26, 2018 · 2 min