Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

1.引言

人们在潜意识里,总会觉得复杂且精巧的东西是好东西。但是这个复杂这个词在软件架构设计中,却不一定是好事情。因为过于精巧和复杂的系统往往意味着系统更难以维护,出现问题后,故障更难排查。萌叔在阅读和分享RocketMQ的过程中,发现它有很多设计非常的简单粗暴,堪称"暴力美学"的典范,同时又给人眼前一亮的感觉(还能这么玩)。

2. NameServer高可用

在RocketMQ的架构体系中,NameServer的作用类似于注册中心,Broker会周期性的向NameServer发送心跳,注册Topic信息。Producer和Consumer会向NameServer查询某个Topic的路由信息(Topic位于哪个Broker)
Topic路由信息

{
    "OrderTopicConf": "",
    "queueDatas": [{
        "brokerName": "broker-3",
        "readQueueNums": 4,
        "writeQueueNums": 4,
        "perm": 6,
        "topicSynFlag": 0
    }, {
        "brokerName": "broker-4",
        "readQueueNums": 4,
        "writeQueueNums": 4,
        "perm": 6,
        "topicSynFlag": 0
    }],
    ...
}

如上面所示,某个Topic位于broker-3和broker-4,每个Broker上有4个MessageQueue

那么如何保证部分NameServer实例宕机后,注册中心的功能仍然能够正常运转呢?

按照正常点的想法,肯定是NameServer分成Master和Slave,然后Master和Slave之间在加上数据同步,如果Master宕机了,只要进行主从切换即可。这种做法肯定没问题,毕竟Hadoop中的NameNode也就是这么做的。

RocketMQ中的实现要简单的多,每个NameServer相互独立,并且它们之间没有通信。Broker会向每一个NameServer、NameServer1、NameServer2等等发送心跳,心跳中包含有它所维护的每个Topic的信息),这样每个NameServer就都含有路由信息。

等到Producer和Consumer向NameServer查询路由信息时,它会尝试向每一个NameServer进行请求。

下面的代码萌叔使用的是rocketmq-client-go的代码,因为rocketmq-client-go比Java的代码更加明显。
internal/route.go#queryTopicRouteInfoFromServer

func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
    request := &GetRouteInfoRequestHeader{
        Topic: topic,
    }

    var (
        response *remote.RemotingCommand
        err      error
    )
   ...
   // 遍历每一个NameServer
    for i := 0; i < s.Size(); i++ { 
        rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
        ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
        response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)

        if err == nil {
            cancel()
            break
        }
        cancel()
    }
    if err != nil {
        rlog.Error("connect to namesrv failed.", map[string]interface{}{
            "namesrv": s,
            "topic":   topic,
        })
        return nil, primitive.NewRemotingErr(err.Error())
    }

    switch response.Code {
    case ResSuccess:
        if response.Body == nil {
            return nil, primitive.NewMQClientErr(response.Code, response.Remark)
        }
        routeData := &TopicRouteData{}

        err = routeData.decode(string(response.Body))
        if err != nil {
            rlog.Warning("decode TopicRouteData error: %s", map[string]interface{}{
                rlog.LogKeyUnderlayError: err,
                "topic":                  topic,
            })
            return nil, err
        }
        return routeData, nil
    case ResTopicNotExist:
        return nil, ErrTopicNotExist
    default:
        return nil, primitive.NewMQClientErr(response.Code, response.Remark)
    }
}

微信公众号

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注