RocketMQ Filter机制探秘
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
本文基于rocketmq-all-4.8.0
1. client使用Filter
RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型--Tag
和SQL92
。
1.1 Tag
按照Tag进行过滤.
首先需要给消息打上tag,注意:1条消息只能包含最多1个tag
生产者代码示例1
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
topic := "test"
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
// 打tag
msg.WithTag("tagA")
// broker会针对key,建立索引存储在indexFile中
msg.WithKeys([]string{"key1", "key2"})
msg.WithProperty("age", "10")
msg.WithProperty("color", "red")
res, err := p.SendSync(context.Background(), msg)
...
}
Message中一些重要的属性
属性 | 说明 | 备注 |
---|---|---|
KEYS | 可以用来建立索引,用于后期查询 | |
UNIQ_KEY | 在客户端生成的唯一主键 | |
SHARDING_KEY | 用于选择MessageQueue | |
TAGS | 用于过滤使用 | 注意:一个消息只能对应最多一个tag |
消费者代码示例1
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup2"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
)
// 订阅包含tagA或者tagB的消息
selector := consumer.MessageSelector{Type: consumer.TAG, Expression: "tagA || tagB"}
// 订阅所有tag
// selector = consumer.MessageSelector{Type: consumer.TAG, Expression: "*"}
err := c.Subscribe("test", selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
// 如果执行出错,想稍后再次执行
// 可以返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
}
更多使用规则,可以参考ExpressionType.java
1.2 SQL92
生产者代码示例见 生产者代码示例1
消费者代码示例2
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup2"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
)
selector = consumer.MessageSelector{Type: consumer.SQL92, Expression: "age < 20 AND color = 'red' "}
err := c.Subscribe("test", selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
}
2. client和broker对消息的过滤
让我们再回顾一下订阅消息和消费的过程
1)client
提交SubscriptionData
给Broker
,告知需要订阅什么样的数据
class SubscriptionData implements Comparable<SubscriptionData> {
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<String> tagsSet;
private Set<Integer> codeSet;
private long subVersion;
private String expressionType;
}
2) 根据consumerOffset从ConsumeQueue
中提取数据
提取的过程中,会做粗粒度的筛选
ExpressionMessageFilter.isMatchedByConsumeQueue()
3) 根据ConsumeQueue
中存储的Offset从CommitLog
中获取消息
提取的过程中,会做细粒度的过滤
ExpressionMessageFilter.isMatchedByCommitLog()
4)消息被发给Client
,对于tag类型,client
还会再做一次过滤
PullAPIWrapper.processPullResult()
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 消息的tag必须包含在订阅数据的tag集合中
// tagsSet的类型是Set<String>
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
...
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
显然应该尽量让第2步和第3步过滤的足够干净,这样可以避免无效数据在Broker
和Client
之间传输
2.1 针对Tag类型的订阅过滤
2.1.1 isMatchedByConsumeQueue
// 判断是否包含tag的hashCode
subscriptionData.getCodeSet().contains(tagsCode.intValue());
codeSet的计算可以参看
FilterAPI.buildSubscriptionData()
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString);
// 注意这里
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
2.1.2 isMatchedByCommitLog
当前版本的处理是直接返回true,萌叔认为这里可以
subscriptionData.getTagsSet().contains(msg.getTags());
再过滤一次,杜绝无效数据在Broker
和Client
之间传输的可能
2.2 针对SQL92类型的订阅过滤
2.2.1 isMatchedByConsumeQueue
SQL92的粗粒度过滤使用的是BloomFilter
,这个BloomFilter
存储在ConsumeQueueExt
文件中
需要修改Broker
配置文件
enableConsumeQueueExt = true
Consumequeue
文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}-
ConsumeQueueExt
文件的存储路径默认为$HOME/store/consumequeue_ext/{topic}/{queueId}/{fileName}
其格式为
这里的BitMap就是BloomFilter
,BitMap中存储的是
{consumerGroup} + "#" + {topic}
- 消息满足的每一个
subscriptionData
对应的consumerGroup
信息都会被写入BitMap - 如果消息满足SQL92,则BitMap对应的bit会被置为1。
这里相当于做了预处理,满足Consumer订阅要求的消息已经提前被打上了标记(BitMap), 等到Consumer实际取用数据的时候,需要检查的范围就大大缩小了。
CommitLogDispatcherCalcBitMap.dispatch()
public void dispatch(DispatchRequest request) {
if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
return;
}
try {
// 获取当前Topic下,所有的订阅请求信息
Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
...
Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
BitsArray filterBitMap = BitsArray.create(
this.consumerFilterManager.getBloomFilter().getM()
);
long startTime = System.currentTimeMillis();
while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();
...
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
// 执行判断(SQL已经提前编译过了,类似于的AST树)
ret = filterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
}
...
// eval true
// filterBitMap对应的位置为1
if (ret != null && ret instanceof Boolean && (Boolean) ret) {
consumerFilterManager.getBloomFilter().hashTo(
filterData.getBloomFilterData(),
filterBitMap
);
}
}
request.setBitMap(filterBitMap.bytes());
} catch (Throwable e) {
}
}
这里有一个问题,可能消息产生的时候,consumerGroup还没有发出订阅请求,所以BitMap中不包含任何有效信息,这种情况下只能认为这个消息是有效的。只能完全依赖细粒度阶段的过滤了
2.2.2 isMatchedByCommitLog
使用编译好的表达式,带入消息的属性值,判断是否满足条件。
编译代码都在 org/apache/rocketmq/filter/parser中
编译的过程类似于编译原理中构建语法树的过程。