版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 前几天突然收到线上rocketmq报警,rocketmq队列中的消息有堆积。
打开rocketmq的管理界面,发现broker-4:MessageQueue[4] 上没有消费者,所以消息一直在堆积。重启client,问题仍然无法解决。
查看client的日志,萌叔发现有如下可疑的日志
time="2023-07-01T23:03:28+08:00" level=error msg="fetch offset of mq from broker error" MessageQueue="MessageQueue [topic=account-to-redis, brokerName=broker-4, queueId=3]" consumerGroup=CG-account underlayError="broker response code: 22, remarks: Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first" “fetch offset of mq from broker error” 这应该就是导致broker-4:MessageQueue[4]一直无法被消费的原因
2. 解决问题 通过网上搜索,萌叔发现了参考资料1.rocketmq-client-go注册消费者组的问题 根据这份资料,此问题是rocketmq-client-go的一个bug。并已经在最新版本中被修复。
offset_store.go
func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) { ... cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil) res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second) if err != nil { return -1, err } // 增加判断逻辑 if res.Code == internal.ResQueryNotFount { return -1, nil // 从最小offset开始消费 } // 原先会从这里返回error if res.Code != internal.ResSuccess { return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark) } off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64) if err != nil { return -1, err } return off, nil } 也就是说当无法从broker 获取offset的时候,直接从最小Offset开始消费。
...