引言在当今数据驱动的世界中,实时数据同步和变更追踪对于保持数据一致性和支持业务决策至关重要。MySQL 作为一款流行的开源关系型数据库管理系统,提供了二进制日志(Binlog)功能,记录所有对数据库结...
在当今数据驱动的世界中,实时数据同步和变更追踪对于保持数据一致性和支持业务决策至关重要。MySQL 作为一款流行的开源关系型数据库管理系统,提供了二进制日志(Binlog)功能,记录所有对数据库结构和内容的更改。通过监听 Binlog,可以实现实时数据同步、数据恢复、主从复制以及审计等关键功能。
MySQL Binlog 是一种事务安全的日志文件,从 MySQL 8.0 版本开始默认启用。它记录了所有对数据库结构和内容进行修改的操作,例如插入、更新和删除。Binlog 以事件的形式记录这些操作,每个事件包含了执行操作所需的全部信息。
对于 MySQL 5.x 版本,需要手动开启 Binlog。编辑配置文件 my.cnf 或者 my.ini,并在 [mysqld] 部分添加以下配置:
[mysqld]
log-bin=mysql-bin
server-id=1然后重启 MySQL 服务使设置生效。
Canal 是阿里巴巴开源的一个基于 MySQL Binlog 的增量订阅和消费组件。它支持多种数据源类型,包括 MySQL、MariaDB 等,并提供丰富的配置选项和扩展能力。
canal.properties 文件,配置 MySQL 数据库连接信息。public class CanalClient { public static void main(String[] args) { // 创建连接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "", "", "", "", "", 60000, 60000); // 连接 Canal connector.connect(); // 订阅数据库和表 connector.subscribe("mydatabase\\.mytable"); // 回滚到未处理的位置 connector.rollback(); // 持续读取 Binlog while (true) { try { // 获取一批 Binlog 数据 Message message = connector.getWithoutAck(1000, 1000, 10000); // 处理数据 processMessage(message); // 确认处理成功 connector.ack(message.getId()); } catch (Exception e) { e.printStackTrace(); // 回滚到未处理的位置 connector.rollback(); } } } private static void processMessage(Message message) { // 处理 Binlog 数据 }
}Debezium 是一个分布式平台,用于实时捕获数据库更改事件并将其转换为 Kafka 消息。它支持多种数据库类型,包括 MySQL、PostgreSQL、MongoDB 等。
public class DebeziumClient { public static void main(String[] args) { // 创建 Kafka 消费者 KafkaConsumer consumer = new KafkaConsumer<>(props); // 订阅 Kafka 主题 consumer.subscribe(Collections.singletonList("mydatabase.mytable")); // 持续读取 Kafka 消息 while (true) { // 获取一批 Kafka 消息 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); // 处理消息 for (ConsumerRecord record : records) { processRecord(record); } } } private static void processRecord(ConsumerRecord record) { // 处理 Kafka 消息 }
} MySQL Bin