Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

本文基于rocketmq-all-4.8.0

1. client使用Filter

RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型--TagSQL92

1.1 Tag

按照Tag进行过滤.
首先需要给消息打上tag,注意:1条消息只能包含最多1个tag
生产者代码示例1

package main

import (
    "context"
    "fmt"
    "os"
    "strconv"

    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
    p, _ := rocketmq.NewProducer(
        producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})),
        producer.WithRetry(2),
    )
    err := p.Start()
    if err != nil {
        fmt.Printf("start producer error: %s", err.Error())
        os.Exit(1)
    }
    topic := "test"

    msg := &primitive.Message{
        Topic: topic,
        Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
    }
    // 打tag
    msg.WithTag("tagA")
    // broker会针对key,建立索引存储在indexFile中
    msg.WithKeys([]string{"key1", "key2"})
    msg.WithProperty("age", "10")
    msg.WithProperty("color", "red")
    res, err := p.SendSync(context.Background(), msg)
    ...
}

Message中一些重要的属性

属性 说明 备注
KEYS 可以用来建立索引,用于后期查询
UNIQ_KEY 在客户端生成的唯一主键
SHARDING_KEY 用于选择MessageQueue
TAGS 用于过滤使用 注意:一个消息只能对应最多一个tag

消费者代码示例1

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
)

func main() {
    sig := make(chan os.Signal)
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithGroupName("testGroup2"),
        consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
    )
    // 订阅包含tagA或者tagB的消息
    selector := consumer.MessageSelector{Type: consumer.TAG, Expression: "tagA || tagB"}
    // 订阅所有tag
    // selector = consumer.MessageSelector{Type: consumer.TAG, Expression: "*"} 
    err := c.Subscribe("test", selector, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            fmt.Printf("subscribe callback: %v \n", msgs[i])
        }
        // 如果执行出错,想稍后再次执行
        // 可以返回consumer.ConsumeRetryLater
        return consumer.ConsumeSuccess, nil
    })
}

更多使用规则,可以参考ExpressionType.java

1.2 SQL92

生产者代码示例见 生产者代码示例1

消费者代码示例2

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
)

func main() {
    sig := make(chan os.Signal)
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithGroupName("testGroup2"),
        consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
    )
    selector = consumer.MessageSelector{Type: consumer.SQL92, Expression: "age < 20 AND color = 'red' "}
    err := c.Subscribe("test", selector, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            fmt.Printf("subscribe callback: %v \n", msgs[i])
        }
        return consumer.ConsumeSuccess, nil
    })
}

2. client和broker对消息的过滤

让我们再回顾一下订阅消息和消费的过程

1)client提交SubscriptionDataBroker,告知需要订阅什么样的数据

class SubscriptionData implements Comparable<SubscriptionData> {
    private boolean classFilterMode = false;
    private String topic;
    private String subString;
    private Set<String> tagsSet;
    private Set<Integer> codeSet;
    private long subVersion;
    private String expressionType;
}

2) 根据consumerOffset从ConsumeQueue中提取数据

提取的过程中,会做粗粒度的筛选
ExpressionMessageFilter.isMatchedByConsumeQueue()

3) 根据ConsumeQueue中存储的Offset从CommitLog中获取消息

提取的过程中,会做细粒度的过滤
ExpressionMessageFilter.isMatchedByCommitLog()

4)消息被发给Client,对于tag类型,client 还会再做一次过滤

PullAPIWrapper.processPullResult()

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
    final SubscriptionData subscriptionData) {
    PullResultExt pullResultExt = (PullResultExt) pullResult;
    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
        List<MessageExt> msgListFilterAgain = msgList;
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if (msg.getTags() != null) {
                    // 消息的tag必须包含在订阅数据的tag集合中
                    // tagsSet的类型是Set<String>
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                        msgListFilterAgain.add(msg);
                    }
                }
            }
        }
    ...
    }
    pullResultExt.setMessageBinary(null);
    return pullResult;
}

显然应该尽量让第2步和第3步过滤的足够干净,这样可以避免无效数据在BrokerClient之间传输

2.1 针对Tag类型的订阅过滤

2.1.1 isMatchedByConsumeQueue

// 判断是否包含tag的hashCode
subscriptionData.getCodeSet().contains(tagsCode.intValue());

codeSet的计算可以参看
FilterAPI.buildSubscriptionData()

    public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
        String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);

        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            subscriptionData.getTagsSet().add(trimString);
                            // 注意这里
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }

        return subscriptionData;
    }

2.1.2 isMatchedByCommitLog

当前版本的处理是直接返回true,萌叔认为这里可以

subscriptionData.getTagsSet().contains(msg.getTags());

再过滤一次,杜绝无效数据在BrokerClient之间传输的可能

2.2 针对SQL92类型的订阅过滤

2.2.1 isMatchedByConsumeQueue

SQL92的粗粒度过滤使用的是BloomFilter,这个BloomFilter存储在ConsumeQueueExt文件中
需要修改Broker配置文件

enableConsumeQueueExt = true
  • Consumequeue文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}

  • ConsumeQueueExt文件的存储路径默认为$HOME/store/consumequeue_ext/{topic}/{queueId}/{fileName}
    其格式为

这里的BitMap就是BloomFilter,BitMap中存储的是

{consumerGroup} + "#" + {topic}
  • 消息满足的每一个subscriptionData对应的consumerGroup信息都会被写入BitMap
  • 如果消息满足SQL92,则BitMap对应的bit会被置为1。

这里相当于做了预处理,满足Consumer订阅要求的消息已经提前被打上了标记(BitMap), 等到Consumer实际取用数据的时候,需要检查的范围就大大缩小了。

CommitLogDispatcherCalcBitMap.dispatch()

public void dispatch(DispatchRequest request) {
    if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
        return;
    }

    try {
        // 获取当前Topic下,所有的订阅请求信息
        Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
        ...
        Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
        BitsArray filterBitMap = BitsArray.create(
            this.consumerFilterManager.getBloomFilter().getM()
        );

        long startTime = System.currentTimeMillis();
        while (iterator.hasNext()) {
            ConsumerFilterData filterData = iterator.next();

            ...

            Object ret = null;
            try {
                MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
                // 执行判断(SQL已经提前编译过了,类似于的AST树)
                ret = filterData.getCompiledExpression().evaluate(context);
            } catch (Throwable e) {
                log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
            }

            ...
            // eval true
            // filterBitMap对应的位置为1
            if (ret != null && ret instanceof Boolean && (Boolean) ret) {
                consumerFilterManager.getBloomFilter().hashTo(
                    filterData.getBloomFilterData(), 
                    filterBitMap
                );
            }
        }
        request.setBitMap(filterBitMap.bytes());

    } catch (Throwable e) {

    }
}

这里有一个问题,可能消息产生的时候,consumerGroup还没有发出订阅请求,所以BitMap中不包含任何有效信息,这种情况下只能认为这个消息是有效的。只能完全依赖细粒度阶段的过滤了

2.2.2 isMatchedByCommitLog

使用编译好的表达式,带入消息的属性值,判断是否满足条件。
编译代码都在 org/apache/rocketmq/filter/parser

编译的过程类似于编译原理中构建语法树的过程。

参考资料

1.深入了解 RocketMQ 之过滤器


微信公众号

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注