如何使用Kafka订阅数据库的实时Binlog

原创 吴就业 124 0 2020-11-13

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/c8d8123682a34f65ba1e8d446e2d9eda

作者:吴就业
链接:https://wujiuye.com/article/c8d8123682a34f65ba1e8d446e2d9eda
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

订阅Binlog的目的在于,实现实时的缓存更新、处理复杂逻辑数据实时同步到Elasticsearch/其它库-表等业务场景。

本篇内容包括:

一种在应用层实现监听SQL的方式

笔者之前写过关于在应用中利用Mybatis插件和SQL解析工具实现监听SQL从而更新数据的文章,并且将这一功能整合到了个人的开源项目(easymulti-datasource-spring-boot-starter)中,支持监听事务,支持实时消费监听到的SQL,也可以通过给事务监听器注册回调接口方式,在事务提交时才开始消费监听到的SQL

使用easymulti-datasource-spring-boot-starter也能在应用层面轻松实现实时SQL订阅,但这种方式也存在弊端,虽然监听到SQL后也是异步消费,但拦截SQL、分析SQL,本身也会有点性能损耗。而如果是有多个应用修改同一个表的情况,那么就需要每个应用都写一遍消费的代码。

如果可以在更底层,直接订阅MysqlBinlog,效率会比在应用层实现高得多。

预备知识:关于Mysql事务的两阶段提交与Binlog

Binlog用于记录数据库执行的写入操作信息,以二进制的形式保存在磁盘中。

BinlogMysql的逻辑日志,并且由Server层进行记录,无论使用何种存储引擎,Mysql数据库都会记录Binlog日志。

Mysql只有在事务提交时才会记录Biglog,并且事务在提交时,Biglog还只是记录在内存中,然后才通过配置的刷盘策略写入到文件中。

Mysql通过sync_binlog参数控制Biglog的刷盘时机,取值范围是0-N

毫无疑问,sync_binlog最安全的是设置是1,这也是MySQL 5.7.7之后版本的默认值。

通常我们提到的Mysql事务的两阶段提交都与InnoDB存储引擎有关。

Mysql事务分两个阶段提交,第一阶段由存储引擎预写记录,如InnoDB存储引擎写Redolog,此阶段Binlog不作任何操作;第二阶段首先是写Binlog,然后再由存储引擎完成事务的提交工作,如写入commit日记、释放锁等。

当第二阶段的写Binlog成功后,MySQL就会认为事务已经提交并且持久化了,所以在这一步Binlog就已经可以发送给订阅者了。如果在写完Binlog后,存储引擎还没有完成提交的事务,刚好在这个时刻数据库崩溃,那么重启后依然能根据Binlog正确恢复该事务。如果在写Binlog这一步完成之前,任何操作的失败都会引起事务回滚。

所以,如果是直接订阅Binlog,我们并不需要关心事务最终是提交了还是回滚了,在事务未提交之前,我们都订阅不到该事务中执行的任何SQL的日记。

想要了解更多,推荐阅读文章:MySQL · 原理介绍 · 再议MySQL的故障恢复》http://mysql.taobao.org/monthly/2018/12/04/

预备知识:关于Kafka

数据存储问题

Kafka集群保留所有发布的记录,无论它们是否已被消费,可通过配置保留期限参数来控制消息的保留时长。如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。

offset消费偏移量

偏移量由消费者所控制,由消费者在消费记录后commit一个新的偏移量,kafka会为消费者存储这个偏移量,以便于后续继续消费,kafka会按group + topic + partition存储偏移量。当然,也可以自行存储,关于自行存储偏移量需要注意的问题后续会提到。

由于kafkagroup + topic + partition存储偏移量,这同时也对应另一个问题:”同一个分组内,一个topic的每个partition都只能有一个消费者消费,但一个消费者可以同时消费多个partition。”

由于offset由消费者控制,所以消费者可以采用任何顺序来消费记录,也就是说,一个topic的任一消费者都可以重置到一个旧的偏移量,从而重新处理过去的数据,也可以跳过最近的记录,从当前位置开始消费。

消费者

一个KafkaConsumer实例并不一定就等于一个消费者。

subscribe模式下,一个KafkaConsumer实例等于一个消费者。假设只有一个分区,开启多个KafkaConsumer,那么将会有一个消费者处于空闲状态,即这个线程每次调用该KafkaConsumer实例的poll方法都会一直返回空,拉不到任何消息,直到当前正在消费的KafkaConsumer长连接掉线后,重平衡后空闲的消费者才会拉取到记录。

这也证实了这句话:Kafka实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个消费者都是某个分区的唯一消费者。

subscribe模式下,与其说一个KafkaConsumer等于一个消费者,不如说,一个连接(Socket)等于一个消费者。

而在assign模式下,如果多个KafkaConsumer订阅的都是指定的topic和分区(并且同组),那么这些KafkaConsumer拉取的都会是同一个分区的记录。这里只是举例说明,不要这样用,否则会重复消费记录,两个线程交叉提交(commit)偏移量(offset)也会出问题。

消费者组

通常情况下,每个topic都会有一些消费组,一个消费组就是一个逻辑订阅者。

例如:

topic:用户注册
group 1:短信推送服务订阅者
group 2:邮件推送服务订阅者

group1group2是逻辑订阅者,但每个逻辑订阅者下面都可以有多个消费者。

同一个组内的消费者数量不要超过topicpartition数量,因为超出partition数量的消费者不会被分配到partition,也就是会处于空闲状态(见”消费者”下的描述);

维护消费者组中的消费关系由Kafka协议动态处理,当有新的消费者加入组时,新加入的消费者将从组中其他成员处接管一些partition分区,当一个消费者消失时,该消费者拥有的分区将被重新分配给其它剩余的消费者。

还有一点,在同一个分组下,如果一个topic的每个分区当前都有一个消费者正在消费,新加入的消费者将会替代一个正在消费的消费者,接管被替代的消费者消费的分区。

阿里云数据传输服务DTS-数据订阅

阿里云数据传输服务DTS支持MySQLDRDSBinlog实时订阅。

我们可以不必使用官方提供的SDK订阅Binlog,而只需要使用kafka客户端,使用kafkaAPI实现Binlog订阅。

官方文档:使用Kafka客户端消费订阅数据https://help.aliyun.com/document_detail/121239.html?spm=a2c4g.11186623.6.785.6d4d6d2aIOqQQm

官方提供的DEMOsubscribe_examplehttps://github.com/LioRoger/subscribe_example,该demo龙玄大佬提供。

我们选择基于官方demo[subscribe_example/javaimpl]构建Mysql Binlog实时订阅服务(试用阶段),而不是重复造轮子。但我们对源码做了部分修改,保留消息反序列化、MetaStoreCheckpoint特性。其中MetaStoreCheckpoint是这个demo最值得学习的地方。

官方DEMO的消费模型:生产->消费模型

DEMO只开启一个消费者,这个消费者负责订阅消息,并将订阅到的消息放入一个阻塞队列(LinkedBlockingQueue)中,这个阻塞队列的默认大小设置为512

另外开启一个真正消费消息的线程,从该阻塞队列中读取消息并调用RecordListenerconsume方法消费,在 RecordListener消费完消息后,将该消息的offset包装成一个检查点(Checkpoint),将该检查点设置为最新的检查点,另外会有一个定时任务每5秒提交一次最新的检查点,即提交offset

kafka消费者每次都有可能拉取到一批消息,并且这些消息是按发布顺序排好序的。因为topic的一个分区只能被一个消费者消费,而消息在分区中本就按消息的发布顺序排好序的。

DEMO中,消费者将订阅到的消息放入阻塞队列也是按顺序放入,当队列满时会阻塞等待,因此只需要确保按顺序消费阻塞队列中的消息并提交offset

如果不按顺序消费阻塞队列中的消息会怎样?

假设多个线程并行无顺序的消费拉取到的消息,那么就无法确保offset被正确提交,可能会导致部分消息重复消费。

在不严格要求每条消息都必须正确无异常地被消费的情况下,我们可以使用多线程消费,提升消息的消费速度。比如,消费阻塞队列中消息的线程只负责从阻塞队列获取消息,并负责解析,其它例如更新缓存等行为放到异步线程池中去执行,只要成功放入异步线程池,就更新Checkpointoffset),继续消费后面的消息。

官方DEMO提供的MetaStore与Checkpoint特性

Checkpoint用于记录分组内的一个topic的某个分区当前实际消费到的位置(偏移量:offset)。

/**
 * 安全检查点(即:记录消费位置)
 */
public class Checkpoint {
    // 分区信息
    private final TopicPartition topicPartition;
    private final long timeStamp;
    private final long offset;
    public Checkpoint(TopicPartition topicPartition, long timeStamp, long offset) {
        this.topicPartition = topicPartition;
        this.timeStamp = timeStamp;
        this.offset = offset;
    }
}

MetaStore则用于存储Checkpoint,或者说是提交偏移量。

public interface MetaStore<V> {
    Future<V> serializeTo(TopicPartition topicPartition, String group, V value);
    V deserializeFrom(TopicPartition topicPartition, String group);
}

DEMO提供了两个实现类:KafkaMetaStoreLocalFileMetaStore。其中LocalFileMetaStore实现的就是使用本地文件存储消费的分区的偏移量,KafkaMetaStore则是调用KafkaConsumercommitAsync方法异步提交偏移量,也就是说让kafka存储偏移量。

需要注意的是,在subscribe模式下,不要使用LocalFileMetaStore。当消费者以集群方式部署时,节点重启后由于kafka的再平衡,该节点消费的分区可能与重启之前的分区不同,那么本地文件存储的消费偏移量就使用不上,会导致从头(配置的初始化消费位置)开始消费记录;而如果只是部署一个消费者服务,或者多个消费者是在一个进程内的,又或是使用assign模式,那么可以使用LocalFileMetaStore,但需要确保每次服务重启都存在偏移量文件,如果切换服务器部署,则需要将偏移量文件同步到新的服务器上。

为了省去不必要的麻烦,我们直接弃用LocalFileMetaStore,而使用KafkaMetaStore

public class KafkaMetaStore implements MetaStore<Checkpoint> {

    private volatile KafkaConsumer kafkaConsumer;
    //.....
    // 异步提交offset
    @Override
    public Future<Checkpoint> serializeTo(TopicPartition topicPartition, String group, Checkpoint value) {
        KafkaFutureImpl ret = new KafkaFutureImpl();
        if (null != kafkaConsumer) {
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(value.getOffset(), String.valueOf(value.getTimeStamp()));
            // 异步提交(不能同步提交,否则影响RecordGenerator#run())
            // Notice: commitAsync is only put commit offset request to sending queue, the future  result will be driven by KafkaConsumer.poll() function
            // So if you only call this method but not poll, you may not wait offset commit call back
            kafkaConsumer.commitAsync(Collections.singletonMap(topicPartition, offsetAndMetadata), new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (null != exception) {
                        log.warn("KafkaMetaStore: Commit offset for group[" + group + "] topicPartition[" + topicPartition.toString() + "] " +
                                value.toString() + " failed cause " + exception.getMessage(), exception);
                        ret.completeExceptionally(exception);
                    } else {
                        log.debug("KafkaMetaStore:Commit offset success for group[{}] topicPartition [{}] {}", group, topicPartition, value);
                        ret.complete(value);
                    }
                }
            });
        } else {
            log.warn("KafkaMetaStore: kafka consumer not set, ignore report");
            ret.complete(value);
        }
        return ret;
    }
    // 从kafka获取当前分区的offset和时间戳
    @Override
    public Checkpoint deserializeFrom(TopicPartition topicPartition, String group) {
        if (null != kafkaConsumer) {
            OffsetAndMetadata offsetAndMetadata = kafkaConsumer.committed(topicPartition);
            if (null != offsetAndMetadata) {
                return new Checkpoint(topicPartition, Long.valueOf(offsetAndMetadata.metadata()), offsetAndMetadata.offset(), offsetAndMetadata.metadata());
            } else {
                return null;
            }
        } else {
            log.warn("KafkaMetaStore: kafka consumer not set, ignore fetch offset");
            throw new KafkaException("KafkaMetaStore: kafka consumer not set, ignore fetch offset for group[" + group + "] and tp [" + topicPartition + "]");
        }
    }
}

使用官方DEMO需要注意的地方

如果topic的某个分区从未被消费过,那么在首次启动消费者时,需要配置初始化消费位置,可以使用时间戳,也可以使用offset定位到想要消费的位置。

如果分区被消费过,那么就可以在消费者启动/重启时,先获取最后消费的位置,然后再从最后消费的位置开始消费。但由于offset是定时每5秒才提交一次,所以获取到的offset并不能代表实际消费的偏移量,所以每次重起都会有小部分记录被重新消费,这需要我们自行确保消息的幂等性消费。

官方DEMO默认使用LocalFileMetaStore,替换MetaStore只需要修改RecordGenerator#getConsumerWrap方法,代码如下:

public class RecordGenerator{
    private ConsumerWrap getConsumerWrap(String message) {
        // KafkaConsumer包装器
        ConsumerWrap kafkaConsumerWrap = getConsumerWrap();
        // 不建议使用LocalFileMetaStore存储(特别是部署到k8s上),否则将消费者部署到其它服务器后,需要将localCheckpointStore文件也要同步过去才可以
        // metaStoreCenter.registerStore(LOCAL_FILE_STORE_NAME, new LocalFileMetaStore(LOCAL_FILE_STORE_NAME));
        // 使用KafkaMetaStore
        metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(kafkaConsumerWrap.getRawConsumer()));
        // 从检查点存储器获取检查点(由于是每5秒提交一次,所以每次重起都会有小部分记录被重新消费)
        Checkpoint checkpoint = getCheckpoint();
        // 没有找到检查点,则使用配置的初始化检查点
        if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
            checkpoint = initialCheckpoint; // 在配置文件中配置
            log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
        } else {
            log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint);
        }
        //.......
    }
}

最后,由于阿里云数据传输服务DTS-数据订阅只会将日记提交到一个分区,即一个topic只有一个分区,这是为了确保能够按正确的顺序消费每一条sql日记。所以,我们没有必要使用subscribe模式,应该使用assign模式。

DefaultConsumerWrap封装了KafkaConsumer,使用assign模式在该类的assignTopic方法表现,代码如下。

public class DefaultConsumerWrap extends ConsumerWrap {
    
    private KafkaConsumer<byte[], byte[]> consumer;
	
    @Override
    public void assignTopic(TopicPartition topicPartition, Checkpoint checkpoint) {
        // KafkaConsumer
        consumer.assign(Collections.singletonList(topicPartition));
        log.info("RecordGenerator:  assigned for {} with checkpoint {}", topicPartition, checkpoint);
        // 设置消费位置
        setFetchOffsetByTimestamp(topicPartition, checkpoint);
    }
    
}

其中,assignTopic方法的第二个参数(Checkpoint)从MetaStore获取而来,或者是使用配置的初始化位置。在调用KafkaConsumer#assign方法之后,调用setFetchOffsetByTimestamp方法设置消费位置,后续就可以调用KafkaConsumer#poll方法拉取消息了。

setFetchOffsetByTimestamp方法实现如下,相比DEMO源码,我们做了点修改。

public class DefaultConsumerWrap extends ConsumerWrap {
 	
    @Override
    public void setFetchOffsetByOffset(TopicPartition topicPartition, Checkpoint checkpoint) {
        // 移动到指定位置继续消费
        consumer.seek(topicPartition, checkpoint.getOffset());
    }

    // recommended
    @Override
    public void setFetchOffsetByTimestamp(TopicPartition topicPartition, Checkpoint checkpoint) {
        // 优先使用偏移量
        if (checkpoint.getOffset() > 0) {
            setFetchOffsetByOffset(topicPartition, checkpoint);
            return;
        }
        long timeStamp = checkpoint.getTimeStamp();
        // 根据时间戳获取偏移量
        Map<TopicPartition, OffsetAndTimestamp> remoteOffset = consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timeStamp));
        OffsetAndTimestamp toSet = remoteOffset.get(topicPartition);
        if (null == toSet) {
            throw new RuntimeException("RecordGenerator:seek timestamp for topic [" + topicPartition + "] with timestamp [" + timeStamp + "] failed");
        }
        // 移动到指定位置继续消费
        consumer.seek(topicPartition, toSet.offset());
    }
 }

关于avro序列化与反序列化

官方提供的demo,其中com.alibaba.dts.formats.avro这个package是由avroshcema编译而来的,我们也可以自行编译,具体实现如下:

1、执行命令编译avsc文件生成java代码

java -jar avro/avro-tools-1.8.2.jar compile -string schema avro/Record.avsc .

2、将生成的com.alibaba.dts.formats.avro这个package拷贝到当前工程根目录下面,当然,也可以封装到一个模块,在主模块中引入。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

如何实现SSO单点登录

随着公司业务的发展,子系统越来越多,实现SSO单点登录的需求就愈加迫切。本篇介绍笔者如何实现SSO单点登录系统。

如何并行消费Kafka拉取的数据库Binlog,提升吞吐量

本篇介绍如何并行消费Kafka拉取的数据库Binlog,以及使用Kafka订阅Binlog字段值获取防坑指南(阿里云DTS)。

深入浅出反应式编程原理,反应式编程入门

反应式编程不适用于业务开发,特别是复杂业务系统的开发,这或许就是反应式编程从推出到现在依然不温不火的原因吧。

Spring Data R2DBC快速上手指南

本篇内容介绍如何使用r2dbc-mysql驱动程序包与mysql数据库建立连接、使用r2dbc-pool获取数据库连接、Spring-Data-R2DBC增删改查API、事务的使用,以及R2DBC Repository。

使用Spring WebFlux + R2DBC搭建消息推送服务

消息推送服务主要是处理同步给用户推送短信通知或是异步推送短信通知、微信模板消息通知等。本篇介绍如何使用Spring WebFlux + R2DBC搭建消息推送服务。

教你如何编写一个IDEA插件,并掌握核心知识点PSI

IDEA有着极强的扩展功能,它提供插件扩展支持,让开发者能够参与到IDEA生态建设中,为更多开发者提供便利、提高开发效率。我们常用的插件有Lombok、Mybatis插件,这些插件都大大提高了我们的开发效率。即便IDEA功能已经很强大,并且也已有很多的插件,但也不可能面面俱到,有时候我们需要自给自足。