Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

建议阅读,萌叔之前的文章

1. 前言

一些复杂的业务场景,会出现任务被从接口提交,并在后台进行异步执行的情况。
这种情况下可能需要引入消息队列,涉及TraceContext通过消息进行跨节点(服务)的传播。

参考:
W3C TraceContext 规范

这篇文章萌叔以RocketMQ来举例,看trace信息如何传递。

2. Golang中的实现

手动埋点追踪RocketMQ。
由于目前Golang的RocketMQ客户端缺乏像Java那样的自动埋点支持,
我们需要在消息发送和消费处理的关键位置手动创建Span。

2.1 消息发送方(Producer)

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

func sendMessageWithTracing(ctx context.Context, producer rocketmq.Producer, topic, body string) error {
    tracer := otel.Tracer("rocketmq-producer")

    // 创建一个新的Span,代表本次发送操作。这里的ctx可能是来自HTTP请求或上游服务的上下文。
    _, span := tracer.Start(ctx, "rocketmq.send", trace.WithAttributes(
        attribute.String("messaging.system", "rocketmq"),        // 语义约定:消息系统
        attribute.String("messaging.destination", topic),        // 语义约定:主题
        attribute.String("messaging.rocketmq.message.tags", "your_tag"), // RocketMQ特定属性
        // 可以根据需要添加更多属性,如keys
    ))
    defer span.End() // 确保Span最终会结束

    // 准备消息
    msg := primitive.NewMessage(topic, []byte(body))

    // 关键步骤:将追踪上下文注入到消息属性中
    carrier := NewMessageCarrier(msg) // 你需要实现一个Carrier,见下文说明
    otel.GetTextMapPropagator().Inject(span.SpanContext(), carrier)

    // 发送消息
    _, err := producer.SendSync(msg)
    if err != nil {
        // 记录错误
        span.RecordError(err)
        span.SetStatus(codes.Error, "SendSync failed")
        return err
    }

    span.SetStatus(codes.Ok, "Success")
    return nil
}

关于Carrier:你需要实现propagation.TextMapCarrier接口,以便将追踪信息(TraceID, SpanID等)
以键值对的形式存入RocketMQ消息的用户属性中。一个简单的实现示例如下:

type MessageCarrier struct {
    msg *primitive.Message
}

func NewMessageCarrier(msg *primitive.Message) *MessageCarrier {
    return &MessageCarrier{msg: msg}
}

func (c *MessageCarrier) Get(key string) string {
    return c.msg.GetProperty(key)
}

func (c *MessageCarrier) Set(key, value string) {
    c.msg.WithProperty(key, value)
}

func (c *MessageCarrier) Keys() []string {
    // 返回所有已设置的属性键,可能需要自己维护一个列表或从消息中提取
    // 简化实现可以返回空切片
    return []string{}
}

2.2 消息消费方(Consumer)

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

func consumeMessageWithTracing(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    // 假设每次只处理一条消息
    if len(msgs) == 0 {
        return consumer.ConsumeSuccess, nil
    }
    msg := msgs[0]

    tracer := otel.Tracer("rocketmq-consumer")

    // 关键步骤:从消息属性中提取追踪上下文
    carrier := NewMessageCarrier(msg) // 使用同样的Carrier
    parentCtx := otel.GetTextMapPropagator().Extract(ctx, carrier)

    // 创建一个新的Span,代表本次消费处理操作,并将其与提取到的上下文关联
    // 注意操作名称为 "rocketmq.process"
    newCtx, span := tracer.Start(parentCtx, "rocketmq.process", trace.WithAttributes(
        attribute.String("messaging.system", "rocketmq"),
        attribute.String("messaging.destination", msg.Topic),
        attribute.String("messaging.operation", "process"),
        attribute.String("messaging.message.id", msg.MsgId),
        // 添加消费相关的属性
    ))
    defer span.End()

    // 你的业务处理逻辑,使用新的上下文 newCtx
    // 例如,如果业务逻辑中还有数据库操作或HTTP调用,可以传递 newCtx 来继续链路
    err := yourBusinessLogic(newCtx, msg)

    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, "Processing failed")
        return consumer.ConsumeRetryLater, err
    }

    span.SetStatus(codes.Ok, "Success")
    return consumer.ConsumeSuccess, nil
}

3. 原理

RocketMQ中,消息的结构如下

类似于HTTP的请求和响应,包含header和body。
header中包含extended fields,即 properties。
RocketMQ中的许多关键属性,以及用户自定义的属性都通过此部分传递给消费者。

下面是协议定义的部分重要属性

const (
    // 用于消息索引
    PropertyKeys                           = "KEYS"
    // 消息的标签,用于在同一个 Topic 下对消息进行二级分类,从而实现更精细化的消息过滤和管理
    PropertyTags                           = "TAGS"
    //  设置消息的延迟级别,使消息在指定延迟时间后才能被消费者消费
    PropertyDelayTimeLevel                 = "DELAY"  
    // 客户端生成的msgId
    PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY" 
    // 分片键,确保同一组具有特定业务标识的消息被发送到同一个消息队列中,从而保证这部分消息能够按照发送顺序被消费
    PropertyShardingKey                    = "SHARDING_KEY"  
    // 用于设定任意延时消息
    PropertyDelayTimeInSeconds = "DELAY_TIME_IN_SECONDS"  
    ...
)


message.go

const (
    propertySeparator  = '\002'
    nameValueSeparator = '\001'
)

func (m *Message) MarshallProperties() string {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    buffer := bytes.NewBufferString("")
    for k, v := range m.properties {
        buffer.WriteString(k)
        buffer.WriteRune(nameValueSeparator)
        buffer.WriteString(v)
        buffer.WriteRune(propertySeparator)
    }
    return buffer.String()
}

精简版代码复现

    // 注意:需要自己实现一下
    InitTracerProvider()

    msg := primitive.NewMessage("topic", []byte("my-message"))
    msg.WithProperty("mykey", "my-value")

    // 2. Inject trace context
    carrier := NewMessageCarrier(msg) // 你需要实现一个Carrier
    otel.GetTextMapPropagator().Inject(ctx2, carrier)

    fmt.Println("GetProperties", msg.GetProperties())
    fmt.Println("traceparent", msg.GetProperty("traceparent"))
    // 2. Extract trace context
    carrier2 := NewMessageCarrier(msg) // 使用同样的Carrier
    parentCtx := otel.GetTextMapPropagator().Extract(ctx, carrier2)
    span2 := trace.SpanFromContext(parentCtx)
    fmt.Println(span2.SpanContext().TraceID().String())
    fmt.Println(span2.SpanContext().SpanID().String())

Output

GetProperties map[mykey:my-value traceparent:00-9d727ddb569537827a418b6512f7c907-8d97eda67bdc6973-01]
traceparent 00-9d727ddb569537827a418b6512f7c907-8d97eda67bdc6973-01
9d727ddb569537827a418b6512f7c907
8d97eda67bdc6973

3. 总结

在RocketMQ中,trace context通过消息header frame的properties传递,由于目前Golang的RocketMQ客户端缺乏像Java那样的自动埋点支持,
我们需要在消息发送和消费处理的关键位置手动创建Span。有了这种机制,即使异步执行的任务也能得到很好的追踪和监控。

4. 参考资料

  1. RocketMQ 通信协议
  2. RocketMQ—通信协议

微信公众号