版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

1.引言

人们在潜意识里,总会觉得复杂且精巧的东西是好东西。但是这个复杂这个词在软件架构设计中,却不一定是好事情。因为过于精巧和复杂的系统往往意味着系统更难以维护,出现问题后,故障更难排查。萌叔在阅读和分享RocketMQ的过程中,发现它有很多设计非常的简单粗暴,堪称"暴力美学"的典范,同时又给人眼前一亮的感觉(还能这么玩)。

2. NameServer高可用

在RocketMQ的架构体系中,NameServer的作用类似于注册中心,Broker会周期性的向NameServer发送心跳,注册Topic信息。Producer和Consumer会向NameServer查询某个Topic的路由信息(Topic位于哪个Broker) Topic路由信息

{
    "OrderTopicConf": "",
    "queueDatas": [{
        "brokerName": "broker-3",
        "readQueueNums": 4,
        "writeQueueNums": 4,
        "perm": 6,
        "topicSynFlag": 0
    }, {
        "brokerName": "broker-4",
        "readQueueNums": 4,
        "writeQueueNums": 4,
        "perm": 6,
        "topicSynFlag": 0
    }],
    ...
}

如上面所示,某个Topic位于broker-3和broker-4,每个Broker上有4个MessageQueue

那么如何保证部分NameServer实例宕机后,注册中心的功能仍然能够正常运转呢?

按照正常点的想法,肯定是NameServer分成Master和Slave,然后Master和Slave之间在加上数据同步,如果Master宕机了,只要进行主从切换即可。这种做法肯定没问题,毕竟Hadoop中的NameNode也就是这么做的。

RocketMQ中的实现要简单的多,每个NameServer相互独立,并且它们之间没有通信。Broker会向每一个NameServer、NameServer1、NameServer2等等发送心跳,心跳中包含有它所维护的每个Topic的信息),这样每个NameServer就都含有路由信息。

等到Producer和Consumer向NameServer查询路由信息时,它会尝试向每一个NameServer进行请求。

下面的代码萌叔使用的是rocketmq-client-go的代码,因为rocketmq-client-go比Java的代码更加明显。 internal/route.go#queryTopicRouteInfoFromServer

func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
	request := &GetRouteInfoRequestHeader{
		Topic: topic,
	}

	var (
		response *remote.RemotingCommand
		err      error
	)
   ...
   // 遍历每一个NameServer
	for i := 0; i < s.Size(); i++ { 
		rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
		response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)

		if err == nil {
			cancel()
			break
		}
		cancel()
	}
	if err != nil {
		rlog.Error("connect to namesrv failed.", map[string]interface{}{
			"namesrv": s,
			"topic":   topic,
		})
		return nil, primitive.NewRemotingErr(err.Error())
	}

	switch response.Code {
	case ResSuccess:
		if response.Body == nil {
			return nil, primitive.NewMQClientErr(response.Code, response.Remark)
		}
		routeData := &TopicRouteData{}

		err = routeData.decode(string(response.Body))
		if err != nil {
			rlog.Warning("decode TopicRouteData error: %s", map[string]interface{}{
				rlog.LogKeyUnderlayError: err,
				"topic":                  topic,
			})
			return nil, err
		}
		return routeData, nil
	case ResTopicNotExist:
		return nil, ErrTopicNotExist
	default:
		return nil, primitive.NewMQClientErr(response.Code, response.Remark)
	}
}

微信公众号