OpenTelemetry-通过消息队列传播Trace信息

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 建议阅读,萌叔之前的文章 OpenTelemetry原理及实战 OpenTelemetry个性化采样-根据特定Header决定是否采样 1. 前言 一些复杂的业务场景,会出现任务被从接口提交,并在后台进行异步执行的情况。 这种情况下可能需要引入消息队列,涉及TraceContext通过消息进行跨节点(服务)的传播。 参考: W3C TraceContext 规范 这篇文章萌叔以RocketMQ来举例,看trace信息如何传递。 2. Golang中的实现 手动埋点追踪RocketMQ。 由于目前Golang的RocketMQ客户端缺乏像Java那样的自动埋点支持, 我们需要在消息发送和消费处理的关键位置手动创建Span。 2.1 消息发送方(Producer) import ( "context" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) func sendMessageWithTracing(ctx context.Context, producer rocketmq.Producer, topic, body string) error { tracer := otel.Tracer("rocketmq-producer") // 创建一个新的Span,代表本次发送操作。这里的ctx可能是来自HTTP请求或上游服务的上下文。 _, span := tracer.Start(ctx, "rocketmq.send", trace.WithAttributes( attribute.String("messaging.system", "rocketmq"), // 语义约定:消息系统 attribute.String("messaging.destination", topic), // 语义约定:主题 attribute.String("messaging.rocketmq.message.tags", "your_tag"), // RocketMQ特定属性 // 可以根据需要添加更多属性,如keys )) defer span.End() // 确保Span最终会结束 // 准备消息 msg := primitive.NewMessage(topic, []byte(body)) // 关键步骤:将追踪上下文注入到消息属性中 carrier := NewMessageCarrier(msg) // 你需要实现一个Carrier,见下文说明 otel.GetTextMapPropagator().Inject(span.SpanContext(), carrier) // 发送消息 _, err := producer.SendSync(msg) if err != nil { // 记录错误 span.RecordError(err) span.SetStatus(codes.Error, "SendSync failed") return err } span.SetStatus(codes.Ok, "Success") return nil } 关于Carrier:你需要实现propagation.TextMapCarrier接口,以便将追踪信息(TraceID, SpanID等) 以键值对的形式存入RocketMQ消息的用户属性中。一个简单的实现示例如下: ...

December 25, 2025 · 2 min

RocketMQ架构设计中的”暴力美学”(2)-故障处理

本文基于rocketmq-all-4.8.0 1.引言 人们在潜意识里,总会觉得复杂且精巧的东西是好东西。但是这个复杂这个词在软件架构设计中,却不一定是好事情。 因为过于精巧和复杂的系统往往意味着系统更难以维护,出现问题后,故障更难排查。 萌叔在阅读和分享RocketMQ的过程中, 发现它有很多设计非常的简单粗暴,堪称”暴力美学”的典范, 同时又给人眼前一亮的感觉(还能这么玩)。 2.故障处理 在介绍故障处理机制时,我们假定一个场景 2.1 架构 假定broker有2组 group1 Master1和Slave1 group2 Master2和Slave2 2.2 故障发生 故障发生前,萌叔创建了1个topic,指定分片数为4 mqadmin updateTopic -n <namesrvAddr> -c <clusterName> -t myTopic -q 4 这里需要强调一下,这里指定的4个分片,并不是全局4个分片,而是每个broker有4个分片,情况如下图。 此时 Topic路由信息形如: { "OrderTopicConf": "", "queueDatas": [{ "brokerName": "group1", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, // 可读可写 "topicSynFlag": 0 }, { "brokerName": "group2", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, // 可读可写 "topicSynFlag": 0 }], "brokerDatas": [{ "cluster": "Default_Cluster", "brokerName": "group1", "brokerAddrs": { "0": "192.168.12.123:10911", // master "1": "192.168.12.127:10911" // slave } }, { "cluster": "Default_Cluster", "brokerName": "group2", "brokerAddrs": { "0": "192.168.12.220:10911", "1": "192.168.12.12:10911" } }] } 生产者消息路由策略:Round Robin 消费者消息路由策略:AllocateByAveragely ...

August 7, 2024 · 2 min

rocketmq-client-go 消费MessageQueue消息失败

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 前几天突然收到线上rocketmq报警,rocketmq队列中的消息有堆积。 打开rocketmq的管理界面,发现broker-4:MessageQueue[4] 上没有消费者,所以消息一直在堆积。重启client,问题仍然无法解决。 查看client的日志,萌叔发现有如下可疑的日志 time="2023-07-01T23:03:28+08:00" level=error msg="fetch offset of mq from broker error" MessageQueue="MessageQueue [topic=account-to-redis, brokerName=broker-4, queueId=3]" consumerGroup=CG-account underlayError="broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first" “fetch offset of mq from broker error” 这应该就是导致broker-4:MessageQueue[4]一直无法被消费的原因 2. 解决问题 通过网上搜索,萌叔发现了参考资料1.rocketmq-client-go注册消费者组的问题 根据这份资料,此问题是rocketmq-client-go的一个bug。并已经在最新版本中被修复。 offset_store.go func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) { ... cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil) res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second) if err != nil { return -1, err } // 增加判断逻辑 if res.Code == internal.ResQueryNotFount { return -1, nil // 从最小offset开始消费 } // 原先会从这里返回error if res.Code != internal.ResSuccess { return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark) } off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64) if err != nil { return -1, err } return off, nil } 也就是说当无法从broker 获取offset的时候,直接从最小Offset开始消费。 ...

July 4, 2023 · 3 min

RocketMQ Filter机制探秘

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 本文基于rocketmq-all-4.8.0 1. client使用Filter RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型–Tag和SQL92。 1.1 Tag 按照Tag进行过滤. 首先需要给消息打上tag,注意:1条消息只能包含最多1个tag 生产者代码示例1 package main import ( "context" "fmt" "os" "strconv" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // Package main implements a simple producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } topic := "test" msg := &primitive.Message{ Topic: topic, Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), } // 打tag msg.WithTag("tagA") // broker会针对key,建立索引存储在indexFile中 msg.WithKeys([]string{"key1", "key2"}) msg.WithProperty("age", "10") msg.WithProperty("color", "red") res, err := p.SendSync(context.Background(), msg) ... } Message中一些重要的属性 ...

May 17, 2023 · 3 min

玩转NSQ(3)-漂亮的代码实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1. 创建协程 创建协程示例 n.waitGroup.Wrap(func() { // do something }) util.WaitGroupWrapper package util import ( "sync" ) type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() } 2. 协程池 用于从Channel扫描数据到Client func (n *NSQD) queueScanLoop() { ... for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() // 协程池中的协程的数量是动态变化的 // 理想数量是与channel的数量保持一致 n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } ... } // resizePool adjusts the size of the pool of queueScanWorker goroutines // // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax) // func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) { idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract closeCh <- 1 n.poolSize-- } else { // expand n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } } } 3. 通讯协议 nsq中数据被分为2类指令型数据,消息型数据, 2种数据类型,格式不相同 ...

October 9, 2019 · 2 min

玩转NSQ(2)-消息流转

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1.引言 nsq的架构简单,代码清晰。对于自主造轮子,实现消息队列、消息推送系统、IM都是非常好的参考。 本文将以图表的形式来说明消息在nsdd中的流转。 2.消息流转 2.1 重要的数据结构 Topic/ Channel/ Client都是nsqd中的数据结构。 数据会从Topic复制到Channel1和Channel2 Client是 consumer在nsqd内部的表征 每个Client最多只能订阅一个Channel 它们内部都有一组queue memoryMsgChan chan *Message // 位于内存 backend BackendQueue //当memoryMsgChan写满,则默认写入磁盘 每个producer都有1个读协程,负责把producer发送的消息写入Topic 多个Client可以订阅同一个Channel。即一个Channel,多个消费者,谁抢到算谁的。 每一个Topic都对应1个协程Topic.messagePump负责从Topic复制数据到Channel NSQD.queueScanLoop会控制一组协程NSQD.queueScanWorker(动态大小的协程池) 从Channel(所有topic的Channels)中复制数据到Client中, 之所以是协程池,我觉得可能跟Channel中的消息有延迟发送,且有重入队列的有关操作 每一个Client对应着一个协程protocolV2.messagePump,负责通过TCP连接把数据发送给consumer 3. 其它有趣的小细节 比较有意思的是,nsq官方推荐,nsqd随着发布者一起部署。 发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。 实际上解放了producer,而甩锅给了consumer,如果某个Topic 假定叫topic1。如果topic1位于多个nsqd,consumer需要通过nsqlookupd获知所有拥topic1的nsqd的地址,然后需要在多个nsqd上订阅topic1 这里的nsqlookupd相当于是注册中心。 如果某个nsqd宕机,由于nsqd没有副本,消息可能会丢失 打赏作者

October 9, 2019 · 1 min

玩转NSQ(1)-鉴权

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 环境 软件版本 NSQ v1.2.0 golang 1.12 go-nsq github.com/nsqio/go-nsq v1.0.7 1.引言 NSQ 是一个基于 Go 语言的分布式实时消息平台,它具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。它的吞吐能力非常强,在单实例4核CPU可以达到19wop/s。足以应付日常使用 (见参考资料1) 。 不过它没有内建的数据副本机制(如果某个nsqd宕机,则该nsqd上的消息会丢失),且默认数据也不持久化(需要将mem_queue_size修改为0),使用NSQ仍然需要谨慎。 当我们考查一个消息队列,除了吞吐能力和可靠性,安全性也是重要的考察点。NSQ默认提供了鉴权机制(见查考资料2) 2. 使用鉴权 2.1 启用鉴权&对鉴权服务的要求 启动鉴权 nsqd --auth-http-address=nsqauth.example.com:4181 --http-address=127.0.0.1:4151 NSQ把权限的管理委托给了外部的鉴权服务,通过auth-http-address指定 官方给的鉴权服务的参考代码是 jehiah/nsqauth-contrib 笔者跟据它的思路重新实现了Golang的版本 vearne/go-nsqauthd NSQ要求鉴权服务能够对如下的HTTP进行响应 /auth?secret=xxxx&remote_ip=39.156.69.79&tls=true nsqd会拿着客户端的信息到鉴权服务进行鉴权 secret是客户端提供的秘钥(相当于redis中的密码) remote_ip是客户端的IP地址 tls表示客户端与nsqd之间的通讯是否有TLS保护 鉴权返回形如如下的信息,表明此用户可以操作的topic { "identity": "xxxx", "ttl": 3600, "Authorizations": [ { "channels": [ ".*" ], "topic": "topic1", "permissions": [ "subscribe", // 有topic1上的订阅权限 "publish" // 有topic1上的发布权限 ] } ] } vearne/go-nsqauthd为了简单期间,将秘钥直接存储在了csv文件中。 login ip tls_required topic channel subscribe publish 4BFE467B-FCBA-4519-BAC8-E9A3C57EDEB6 false test .* subscribe publish test_local 127.0.0.1 true local .* subscribe publish test_publish 127.0.0.1/24 false test_publish .* publish 其中login对应secret是秘钥信息, ip可以是某个子网, 如果不写表示没有ip限制,channel和topic都支持通配符的表达式。 ...

September 29, 2019 · 2 min