Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

引言

前几天突然收到线上rocketmq报警,rocketmq队列中的消息有堆积。

打开rocketmq的管理界面,发现broker-4:MessageQueue[4] 上没有消费者,所以消息一直在堆积。重启client,问题仍然无法解决。

查看client的日志,萌叔发现有如下可疑的日志

time="2023-07-01T23:03:28+08:00" level=error 
msg="fetch offset of mq from broker error" 
MessageQueue="MessageQueue [topic=account-to-redis, brokerName=broker-4, queueId=3]"
consumerGroup=CG-account 
underlayError="broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"

"fetch offset of mq from broker error" 这应该就是导致broker-4:MessageQueue[4]一直无法被消费的原因

2. 解决问题

通过网上搜索,萌叔发现了参考资料1.rocketmq-client-go注册消费者组的问题 根据这份资料,此问题是rocketmq-client-go的一个bug。并已经在最新版本中被修复。

offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
    ...
    cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)

    res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
    if err != nil {
        return -1, err
    }

    // 增加判断逻辑
    if res.Code == internal.ResQueryNotFount {
        return -1, nil // 从最小offset开始消费
    }

    // 原先会从这里返回error
    if res.Code != internal.ResSuccess {
        return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
    }

    off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
    if err != nil {
        return -1, err
    }

    return off, nil
}

也就是说当无法从broker 获取offset的时候,直接从最小Offset开始消费。

升级rockect-client-go版本之后,问题解决。

3. 更多

Q:那么为什么笔者的client程序运行了这么长时间,还是一直反复的报

"broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"

consumer group的信息一直没有同步给broker吗?

A:

3.1 发出创建consumer group的指令

在RocketMQ中,需要使用mqadmin创建 consumer group(详情见参考资料5)

mqadmin updateSubGroup -n 172.24.30.100:9876 -g consume-group-name  -c xdf-test1

UpdateSubGroupSubCommand.java

public void execute(final CommandLine commandLine, final Options options,
    RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);

    defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

    try {
        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
        subscriptionGroupConfig.setConsumeBroadcastEnable(false);
        subscriptionGroupConfig.setConsumeFromMinEnable(false);

        // groupName
        subscriptionGroupConfig.setGroupName(commandLine.getOptionValue('g').trim());


        if (commandLine.hasOption('b')) { // 针对特定broker 都发送一次Command
            String addr = commandLine.getOptionValue('b').trim();

            defaultMQAdminExt.start();

            defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
            System.out.printf("create subscription group to %s success.%n", addr);
            System.out.printf("%s", subscriptionGroupConfig);
            return;

        } else if (commandLine.hasOption('c')) {
            String clusterName = commandLine.getOptionValue('c').trim();

            defaultMQAdminExt.start();
            Set<String> masterSet =
                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
            for (String addr : masterSet) {  // 针对集群中的每一个broker(master)都发送一次Command
                try { 
                    defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
                    System.out.printf("create subscription group to %s success.%n", addr);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000 * 1);
                }
            }
            System.out.printf("%s", subscriptionGroupConfig);
            return;
        }

        ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
    } catch (Exception e) {
        throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
    } finally {
        defaultMQAdminExt.shutdown();
    }
}

最终使用MQClientAPIImpl.createSubscriptionGroup

发出Command

  public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
      final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
      RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);

      byte[] body = RemotingSerializable.encode(config);
      request.setBody(body);

      RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
          request, timeoutMillis);
      assert response != null;
      switch (response.getCode()) {
          case ResponseCode.SUCCESS: {
              return;
          }
          default:
              break;
      }
      throw new MQClientException(response.getCode(), response.getRemark());

  }

3.2 Broker收到Command的处理逻辑

SubscriptionGroupManager.java

public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
    Map<String, String> newAttributes = request(config);
    Map<String, String> currentAttributes = current(config.getGroupName());

    Map<String, String> finalAttributes = AttributeUtil.alterCurrentAttributes(
        this.subscriptionGroupTable.get(config.getGroupName()) == null,
        SubscriptionGroupAttributes.ALL,
        ImmutableMap.copyOf(currentAttributes),
        ImmutableMap.copyOf(newAttributes));

    config.setAttributes(finalAttributes);

        // 存储到缓存中
    SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
    if (old != null) {
        log.info("update subscription group config, old: {} new: {}", old, config);
    } else {
        log.info("create new subscription group, {}", config);
    }

    long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
    dataVersion.nextVersion(stateMachineVersion);

        // 持久化(默认以文件形式保存到磁盘)
    this.persist();
}

rocketMQ的broker端中,默认offset的是以json文件的形式持久化到磁盘文件中,

文件路径为${user.home}/store/config/consumerOffset.json。

其内容示例如下:

{
    "offsetTable": {
        "test-topic@test-group": {
            "0": 52116, 
            "1": 52088
        }
    }
}

当client向broker请求fetch offset of mq from broker, 如果lastOffset >=0,直接返回lastOffset

否则根据consumer group指定的策略ConsumeFromWhere,有所不同

  • CONSUME_FROM_LAST_OFFSET 从maxOffset开始消费
  • CONSUME_FROM_FIRST_OFFSET 从0开始消费
  • CONSUME_FROM_TIMESTAMP 根据时间戳请求查找offset,从对应的offset开始消费

4. 总结

看完代码,大家已经明白:UpdateSubGroupSubCommand 只会发给broker一次。如果发送的过程中网络出现异常,或者SubscriptionGroupManager执行失败,都可能导致consumer group的信息在broker端不存在。

参考资料

1.rocketmq-client-go注册消费者组的问题
2.push consumer can‘t consume all queues, and get error ... maybe this group consumer boot first"
3. fix query not found
4.rocketMQ--offset管理
5.官方运维管理命令mqadmin使用手册


微信公众号

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注