现象
BI的同事发现某指标数据展示有问题,发现最近入库的数据缺失,然后反馈到DBA. 经DBA排查后发现原始数据缺少.
排查
- 之前笔者在休假,同事初步排查怀疑是消息阻塞导致.经过代码调整发版之后发现还是有情况发生.
- 笔者接手之后,在本地打印指定点位的消息,发现没有丢失消息的情况.(15分钟一条消息)
于是在线上系统中添加了打印指定点位的日志.(发版下班) - 第二天,查看日志发现有缺失情况,本地打印继续开启 发现没有复现
于是查询消费组,收集到一组 host(ip)
@Test
public void showGroupInfo() throws ExecutionException, InterruptedException {
String id = \"kafka消费组id\";
DescribeConsumerGroupsResult describeConsumerGroupsResult = admin.describeConsumerGroups(Collections.singleton(id));
final KafkaFuture<Map<String, ConsumerGroupDescription>> all = describeConsumerGroupsResult.all();
final Map<String, ConsumerGroupDescription> stringConsumerGroupDescriptionMap = all.get();
final Set<Map.Entry<String, ConsumerGroupDescription>> entries1 = stringConsumerGroupDescriptionMap.entrySet();
for (Map.Entry<String, ConsumerGroupDescription> stringConsumerGroupDescriptionEntry : entries1) {
final ConsumerGroupDescription value = stringConsumerGroupDescriptionEntry.getValue();
final Collection<MemberDescription> members = value.members();
for (MemberDescription member : members) {
String s1 = member.consumerId();
String host = member.host();
String s = member.clientId();
String s2 = member.assignment().toString();
System.out.printf(\"clientId[%s],memberId[%s],host[%s],assignment[%s]%n\", s, s1, host, s2);
}
}
}
- 找运维同事查看ip之后发现了另一个项目,拉取jar反编译之后发现配置文件中kafka 消费者 groupId 配置相同
- 反馈相关项目负责人
疑问
如何统一管理/监控 kafka group 划分 避免此类问题发生?
kafka groupId 是否要按照微服务项目来划分?
来源:https://www.cnblogs.com/zzzz-yh/p/16571672.html
本站部分图文来源于网络,如有侵权请联系删除。