RocketMQ Filter机制探秘
版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 本文基于rocketmq-all-4.8.0 1. client使用Filter RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型–Tag和SQL92。 1.1 Tag 按照Tag进行过滤. 首先需要给消息打上tag,注意:1条消息只能包含最多1个tag 生产者代码示例1 package main import ( "context" "fmt" "os" "strconv" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // Package main implements a simple producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } topic := "test" msg := &primitive.Message{ Topic: topic, Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), } // 打tag msg.WithTag("tagA") // broker会针对key,建立索引存储在indexFile中 msg.WithKeys([]string{"key1", "key2"}) msg.WithProperty("age", "10") msg.WithProperty("color", "red") res, err := p.SendSync(context.Background(), msg) ... } Message中一些重要的属性 ...