rocketmq-client-go 消费MessageQueue消息失败
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
引言
前几天突然收到线上rocketmq报警,rocketmq队列中的消息有堆积。
打开rocketmq的管理界面,发现broker-4:MessageQueue[4]
上没有消费者,所以消息一直在堆积。重启client,问题仍然无法解决。
查看client的日志,萌叔发现有如下可疑的日志
time="2023-07-01T23:03:28+08:00" level=error
msg="fetch offset of mq from broker error"
MessageQueue="MessageQueue [topic=account-to-redis, brokerName=broker-4, queueId=3]"
consumerGroup=CG-account
underlayError="broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"
"fetch offset of mq from broker error" 这应该就是导致broker-4:MessageQueue[4]
一直无法被消费的原因
2. 解决问题
通过网上搜索,萌叔发现了参考资料1.rocketmq-client-go注册消费者组的问题 根据这份资料,此问题是rocketmq-client-go的一个bug。并已经在最新版本中被修复。
offset_store.go
func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
...
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
if err != nil {
return -1, err
}
// 增加判断逻辑
if res.Code == internal.ResQueryNotFount {
return -1, nil // 从最小offset开始消费
}
// 原先会从这里返回error
if res.Code != internal.ResSuccess {
return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
}
off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
if err != nil {
return -1, err
}
return off, nil
}
也就是说当无法从broker
获取offset的时候,直接从最小Offset开始消费。
升级rockect-client-go版本之后,问题解决。
3. 更多
Q:那么为什么笔者的client程序运行了这么长时间,还是一直反复的报
"broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"
consumer group的信息一直没有同步给broker吗?
A:
3.1 发出创建consumer group的指令
在RocketMQ中,需要使用mqadmin
创建 consumer group(详情见参考资料5)
mqadmin updateSubGroup -n 172.24.30.100:9876 -g consume-group-name -c xdf-test1
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setConsumeBroadcastEnable(false);
subscriptionGroupConfig.setConsumeFromMinEnable(false);
// groupName
subscriptionGroupConfig.setGroupName(commandLine.getOptionValue('g').trim());
if (commandLine.hasOption('b')) { // 针对特定broker 都发送一次Command
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
System.out.printf("create subscription group to %s success.%n", addr);
System.out.printf("%s", subscriptionGroupConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) { // 针对集群中的每一个broker(master)都发送一次Command
try {
defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
System.out.printf("create subscription group to %s success.%n", addr);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000 * 1);
}
}
System.out.printf("%s", subscriptionGroupConfig);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
最终使用MQClientAPIImpl.createSubscriptionGroup
发出Command
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
byte[] body = RemotingSerializable.encode(config);
request.setBody(body);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
3.2 Broker收到Command的处理逻辑
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
Map<String, String> newAttributes = request(config);
Map<String, String> currentAttributes = current(config.getGroupName());
Map<String, String> finalAttributes = AttributeUtil.alterCurrentAttributes(
this.subscriptionGroupTable.get(config.getGroupName()) == null,
SubscriptionGroupAttributes.ALL,
ImmutableMap.copyOf(currentAttributes),
ImmutableMap.copyOf(newAttributes));
config.setAttributes(finalAttributes);
// 存储到缓存中
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("update subscription group config, old: {} new: {}", old, config);
} else {
log.info("create new subscription group, {}", config);
}
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
// 持久化(默认以文件形式保存到磁盘)
this.persist();
}
rocketMQ的broker端中,默认offset的是以json文件的形式持久化到磁盘文件中,
文件路径为${user.home}/store/config/consumerOffset.json。
其内容示例如下:
{
"offsetTable": {
"test-topic@test-group": {
"0": 52116,
"1": 52088
}
}
}
当client向broker请求fetch offset of mq from broker
, 如果lastOffset
>=0,直接返回lastOffset
否则根据consumer group指定的策略ConsumeFromWhere,有所不同
- CONSUME_FROM_LAST_OFFSET 从maxOffset开始消费
- CONSUME_FROM_FIRST_OFFSET 从0开始消费
- CONSUME_FROM_TIMESTAMP 根据时间戳请求查找offset,从对应的offset开始消费
4. 总结
看完代码,大家已经明白:UpdateSubGroupSubCommand
只会发给broker
一次。如果发送的过程中网络出现异常,或者SubscriptionGroupManager执行失败,都可能导致consumer group的信息在broker
端不存在。
参考资料
1.rocketmq-client-go注册消费者组的问题
2.push consumer can‘t consume all queues, and get error ... maybe this group consumer boot first"
3. fix query not found
4.rocketMQ--offset管理
5.官方运维管理命令mqadmin使用手册