RocketMQ Filter机制探秘

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 本文基于rocketmq-all-4.8.0 1. client使用Filter RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型–Tag和SQL92。 1.1 Tag 按照Tag进行过滤. 首先需要给消息打上tag,注意:1条消息只能包含最多1个tag 生产者代码示例1 package main import ( "context" "fmt" "os" "strconv" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // Package main implements a simple producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } topic := "test" msg := &primitive.Message{ Topic: topic, Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), } // 打tag msg.WithTag("tagA") // broker会针对key,建立索引存储在indexFile中 msg.WithKeys([]string{"key1", "key2"}) msg.WithProperty("age", "10") msg.WithProperty("color", "red") res, err := p.SendSync(context.Background(), msg) ... } Message中一些重要的属性 ...

May 17, 2023 · 3 min

RocketMQ架构设计中的"暴力美学"(1)-NameServer高可用

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 1.引言 人们在潜意识里,总会觉得复杂且精巧的东西是好东西。但是这个复杂这个词在软件架构设计中,却不一定是好事情。因为过于精巧和复杂的系统往往意味着系统更难以维护,出现问题后,故障更难排查。萌叔在阅读和分享RocketMQ的过程中,发现它有很多设计非常的简单粗暴,堪称"暴力美学"的典范,同时又给人眼前一亮的感觉(还能这么玩)。 2. NameServer高可用 在RocketMQ的架构体系中,NameServer的作用类似于注册中心,Broker会周期性的向NameServer发送心跳,注册Topic信息。Producer和Consumer会向NameServer查询某个Topic的路由信息(Topic位于哪个Broker) Topic路由信息 { "OrderTopicConf": "", "queueDatas": [{ "brokerName": "broker-3", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, "topicSynFlag": 0 }, { "brokerName": "broker-4", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, "topicSynFlag": 0 }], ... } 如上面所示,某个Topic位于broker-3和broker-4,每个Broker上有4个MessageQueue 那么如何保证部分NameServer实例宕机后,注册中心的功能仍然能够正常运转呢? 按照正常点的想法,肯定是NameServer分成Master和Slave,然后Master和Slave之间在加上数据同步,如果Master宕机了,只要进行主从切换即可。这种做法肯定没问题,毕竟Hadoop中的NameNode也就是这么做的。 RocketMQ中的实现要简单的多,每个NameServer相互独立,并且它们之间没有通信。Broker会向每一个NameServer、NameServer1、NameServer2等等发送心跳,心跳中包含有它所维护的每个Topic的信息),这样每个NameServer就都含有路由信息。 等到Producer和Consumer向NameServer查询路由信息时,它会尝试向每一个NameServer进行请求。 下面的代码萌叔使用的是rocketmq-client-go的代码,因为rocketmq-client-go比Java的代码更加明显。 internal/route.go#queryTopicRouteInfoFromServer func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) { request := &GetRouteInfoRequestHeader{ Topic: topic, } var ( response *remote.RemotingCommand err error ) ... // 遍历每一个NameServer for i := 0; i < s.Size(); i++ { rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc) if err == nil { cancel() break } cancel() } if err != nil { rlog.Error("connect to namesrv failed.", map[string]interface{}{ "namesrv": s, "topic": topic, }) return nil, primitive.NewRemotingErr(err.Error()) } switch response.Code { case ResSuccess: if response.Body == nil { return nil, primitive.NewMQClientErr(response.Code, response.Remark) } routeData := &TopicRouteData{} err = routeData.decode(string(response.Body)) if err != nil { rlog.Warning("decode TopicRouteData error: %s", map[string]interface{}{ rlog.LogKeyUnderlayError: err, "topic": topic, }) return nil, err } return routeData, nil case ResTopicNotExist: return nil, ErrTopicNotExist default: return nil, primitive.NewMQClientErr(response.Code, response.Remark) } }

December 7, 2021 · 1 min

rocketmq分享

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 注意:文中使用的部分图,误将ConsumeQueue写成了ConsumerQueue 1.简介 RocketMQ是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。RocketMQ是2012年阿里巴巴开源的第三代分布式消息中间件,2016年11月21日,阿里巴巴向Apache软件基金会捐赠了RocketMQ;第二年2月20日,Apache软件基金会宣布Apache RocketMQ成为顶级项目。 2.架构 正常情况,写和读都走Master,Master如果宕机,读可以走Slave 在 RocketMQ 4.5 版本之前,RocketMQ 只有 Master/Slave 一种部署方式,虽然这种模式可以提供一定的高可用性但也存在比较大的缺陷。为了实现新的高可用多副本架构,RockeMQ 最终选用了基于 Raft 协议的 commitlog 存储库 DLedger。 2.1 四种角色 2.1.1 NameServer 存储元数据 topic -> broker 无状态 接收来自broker的心跳 检查与borker的通讯是否过期 Topic路由信息 { "OrderTopicConf": "", "queueDatas": [{ "brokerName": "broker-3", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, "topicSynFlag": 0 }, { "brokerName": "broker-4", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, "topicSynFlag": 0 }], "brokerDatas": [{ "cluster": "Default_Cluster", "brokerName": "broker-4", "brokerAddrs": { "0": "192.168.12.123:10911", "1": "192.168.12.127:10911" } }, { "cluster": "Default_Cluster", "brokerName": "broker-3", "brokerAddrs": { "1": "192.168.12.220:10911", "0": "192.168.12.12:10911" } }] } 2.1.2 Producter 有发往broker的心跳(Master) ...

November 25, 2021 · 3 min