kafka消息重复消费排查

原创 吴就业 174 0 2022-02-15

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/2ab52cf257194deaa66bc3588348e2f4

作者:吴就业
链接:https://wujiuye.com/article/2ab52cf257194deaa66bc3588348e2f4
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

业务使用我们基础部门封装的kafka组件,在一次版本迭代中,我们引入了offset缓存,正是这个缓存,在某种条件触发下,会导致出现消息重复消费现象。

问题描述

业务方反馈,升级组件后,某个topic,出现过几次不规律的从很早之前的offset重复消费消息,现象表现如下图。

img

查看这些时间点范围的日记发现,都有出现重均衡的情况,所以首先就是怀疑重均衡导致的。

随后看了组件重均衡处理的相关代码,以及出现重复消费的时间点范围内的重均衡日记,锁定了猜测。

场景

此次排查的,出现重复消费现象的topic,只有一个分区,部署4个消费者节点,每个节点开启3个KafkaConsumer,假设给这些消费者编号为:

猜测可能的原因

img

从日记表现来看,猜测可能的原因如下:

只是猜测,当时各种尝试都无法重现出来,可能还受其它条件的影响,因此未被证实。

为什么要缓存offset?

目的是在重均衡时,失去某些分区消费权限的消费者,在重均衡转移后的消费者开始消费这些分区之前,先把最后消费的消息的offset提交,避免接管的消费者产生不必要的重复消费。

消费者每消费一条消息,就会将该消息的offset更新到Map缓存,key为TopicPartition,value为offset。

给消费者注册ConsumerRebalanceListener,在onPartitionsRevoked方法中,将失去消费权限的TopicPartition的offset提交。

正常情况下,不会出现“重均衡到了a0,然后不知道什么原因,a0没开始消费,又重均衡到了其它节点”这种情况,所以在测试组件时,没考虑到这种情况。

修复只需要在ConsumerRebalanceListener的onPartitionsAssigned方法中,移除缓存。

class AutoCommitOffsetRebalanceListener implements ConsumerRebalanceListener {

    private final String group;

    private final KafkaConsumer<String, byte[]> consumer;

    private final Map<TopicPartition, OffsetAndMetadata> offsetNewMap = new ConcurrentHashMap<>();

    public AutoCommitOffsetRebalanceListener(String group, KafkaConsumer<String, byte[]> consumer) {

        this.consumer = consumer;

        this.group = group;

    }

    // 每消费成功一条消息调用 
    void syncOffset(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        offsetNewMap.put(topicPartition,offsetAndMetadata);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>();
        for (TopicPartition topicPartition : partitions) {
            OffsetAndMetadata offsetAndMetadata = offsetNewMap.get(topicPartition);
            if (offsetAndMetadata != null) {
                commitMap.put(topicPartition, offsetAndMetadata.getMetadata());
            }
        }

        // 提交offset
        if (!CollectionUtils.isNullOrEmpty(commitMap)) {
            try {
                this.consumer.commitSync(commitMap);
                LogUtils.info("[onPartitionsRevoked] auto commitSync success. {}", commitMapToString(commitMap));
            } catch (CommitFailedException exception) {
                LogUtils.info("[onPartitionsRevoked] auto commitSync fail. {}, error msg:{}", commitMapToString(commitMap), exception.getMessage());
            }
        }
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // fix cache bug on this
        for (TopicPartition tp : partitions) {
            offsetNewMap.remove(tp);
        }
        LogUtils.info("[onPartitionsAssigned] {}", partitionsToString(partitions));
    }

}

证实猜测

虽然本地debug各种模拟都未能复现,但从日记表现的现象来看,猜测的原因可能性非常大。

本地未能复现“重均衡到了a0,然后不知道什么原因,a0没开始消费,又重均衡到了其它节点”,可能是因为还有其它条件的影响。需要业务方确认消息消费已经做了幂等之后,在将问题修复后,配合升级一个版本,再验证。

升级后,指标数据、日记,都没有再出现重复消费情况,证实了前面提出的猜测。

指标:

img

日记:

img

img

#中间件

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

Dubbo之HTTP RPC vs Dubbo RPC性能压测

此次性能测试对比的是我们基于Dubbo扩展点自实现的Http rpc协议,与Dubbo原生Dubbo rpc协议的单次请求响应平均耗时、吞吐量。

Dubbo为什么会提供泛化调用这个功能

Dubbo的泛化调用功能就类似于Java语言提供的泛型功能,目的都是通用。那为什么需要泛化调用功能呢?

基于扩展点,为dubbo支持跨业务调用

很多规模稍大点的公司,内部都会有多个业务部门,这些业务部门都有自己的业务产品。每个业务部门开发的产品部署的环境物理上也都是相对隔离的,但这些业务部门之间可能存在合作关系,业务关联,因此就有了跨业务RPC调用的需求。

Quartz分布式调度原理

在同一时刻需要触发的Job只有少量的情况下,我们看不到Quartz的性能缺陷,在Job数量明显增加情况下,我们就会发现,调度延迟会有明显增加。尽管横向扩展节点,调度延迟也不会降低,且整体调度性能没有明显好转,反而更糟糕。

重构XXL-JOB,使用响应式编程实现异步RPC提升调度吞吐量

如果同一时刻需要下发几百个执行job的请求给执行器,使用这种阻塞的RPC,意味着需要开启几百个线程,使用几百个连接发送请求,而这几百个线程都需要阻塞等待响应,Job越多,需要的线程数就会越多,对调动中心的性能影响就越大。

重构支持多租户的XXL-JOB,如何实现多个逻辑集群的均衡选主

我们基于XXL-JOB的架构原理,重新架构设计了支持多租户横向扩展的分布式任务调度平台。本篇介绍如何实现多个逻辑集群(多个租户逻辑上是独立的集群)的均衡选主。