主题
Java 实时监听MySQL数据变化
框架一: Binlog4j
Binlog4j 是一个纯 Java 实现的 MySQL binlog 监听工具,它可以直接解析 MySQL 的 binlog,类似于 Canal,但比 Canal 更轻量,不需要额外的服务端配置,适合集成到 Java 项目中。
仓库地址:https://gitee.com/dromara/binlog4j
开始
1.配置 MySQL Binlog: 确保 MySQL 启用了 binlog 功能。
2.依赖配置:
xml
<!-- binlog支持 -->
<dependency>
<groupId>com.gitee.Jmysy</groupId>
<artifactId>binlog4j-spring-boot-starter</artifactId>
<version>1.9.1</version>
</dependency>
3.配置文件
yml
spring:
binlog4j:
database: 要监听的数据库(一个实例上有多个库)
redis-config: #redis配置
host: ip
port: 端口
password: 密码
client-configs:
master:
username: 数据库用户
password: 密码
host: 数据库ip
port: 端口
serverId: 1990
配置说明
- timeOffset 时间偏移量, 单位:毫秒
- serverId 编号
- redisConfig Redis 配置信息, 详见 RedisConfig
- inaugural 首次启动, 如果为 true 在启动时不再读取 Redis 记录
- persistence 是否启用持久化, 默认为 false
- strict 严格模式, 默认为 true
- mode 模式, 详见: BinlogClientMode
- username 数据库账户
- password 数据库密码
- host 数据库所在服务器 IP 地址
- port 数据库占用端口, 默认 3306
- hikariConfig 数据库连接池配置
4. java示例
java
@Slf4j
@BinlogSubscriber(clientName = "master")
public class BinlogPositionEventHandler implements IBinlogEventHandler {
@Override
public void onInsert(BinlogEvent event) {
log.info("===onInsert: {}", JSONUtil.toJsonStr(event));
// 处理插入事件
}
@Override
public void onUpdate(BinlogEvent event) {
log.info("===onUpdate: {}", JSONUtil.toJsonStr(event));
// 处理更新事件
}
@Override
public void onDelete(BinlogEvent event) {
log.info("===onDelete: {}", JSONUtil.toJsonStr(event));
// 处理删除事件
}
@Override
public boolean isHandle(String database, String table) {
log.info("===database: {}, table: {}", database, table);
// 这里可以添加额外的过滤逻辑,如果数据库和表名匹配,则返回 true
// return "mydatabase".equals(database) && "mytable".equals(table);
return true;
}
}
泛型参数
在 BinlogEvent 中 data 与 originalData 的 Class 类型为 Map<String, Object>, 为进一步降低使用的心智负担, IBinlogEventHandler 接口提供了泛型参数的支持, binlog4j 将依据泛型参数, 将 接收到的数据转换为 JavaBean。
java
public class UserBinlogEventHandler implements IBinlogEventHandler<User> {
@Override
public void onInsert(BinlogEvent<User> event) {
System.out.println("插入数据:" + event.getData());
}
@Override
public void onUpdate(BinlogEvent<User> event) {
System.out.println("修改数据:" + event.getData());
}
@Override
public void onDelete(BinlogEvent<User> event) {
System.out.println("删除数据:" + event.getData());
}
@Override
public boolean isHandle(String database, String table) {
return database.equals("pear-admin") && table.equals("sys_user");
}
}
使用案例: 统一监听处理
java
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.date.StopWatch;
import cn.hutool.json.JSONUtil;
import com.gitee.Jmysy.binlog4j.core.BinlogEvent;
import com.gitee.Jmysy.binlog4j.core.IBinlogEventHandler;
import com.gitee.Jmysy.binlog4j.springboot.starter.annotation.BinlogSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import site.morn.rest.RestBuilders;
import site.morn.rest.RestMessage;
import javax.annotation.Resource;
/**
* binlog事件处理器
* 连接数据的用户需要有binlog读权限
*
* @author zwmac
*/
@Slf4j
@BinlogSubscriber(clientName = "master")
public class MyBinlogEventHandler implements IBinlogEventHandler {
@Value("${spring.binlog4j.database:linkappdb}")
public String monitorDatabase;
@Resource
private ProgressWarnService progressWarnService;
@Override
public void onInsert(BinlogEvent binlogEvent) {
//log.info("数据库:" + binlogEvent.getDatabase());
//log.info("数据表:" + binlogEvent.getTable());
//log.info("插入数据:" + binlogEvent.getData());
//需要重新计算场景
//1、插入设置 app_progress_warn_config
//2、新增实际进度详情 app_progress_real_detail
//3、新增进度计划任务(子节点)app_progress_info
RestMessage restMessage = RestBuilders.successMessage();
CalProgressWarnVo calVo = new CalProgressWarnVo();
calVo.setType(1);
String tableName = binlogEvent.getTable();
if ("app_progress_warn_config".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(1);
ProgressWarnConfig progressWarnConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());
calVo.setNewData(progressWarnConfig);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("新增[进度预警配置]数据触发binlog事件执行结果:{}-耗时:{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
if ("app_progress_real_detail".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(2);
ProgressRealDetail realDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
calVo.setNewData(realDetail);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("新增[实际进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
if ("app_progress_info".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
Object progressInfoObj = binlogEvent.getData();
ProgressInfo progressInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
if (progressInfo.getType() == 2) {
//计划任务才需要重新计算
calVo.setDataType(3);
calVo.setNewData(progressInfo);
restMessage = progressWarnService.calProgressWarn(calVo);
}
sw.stop();
log.info("新增[进度任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
}
@Override
public void onUpdate(BinlogEvent binlogEvent) {
//log.info("数据库:" + binlogEvent.getDatabase());
//log.info("数据表:" + binlogEvent.getTable());
//log.info("原数据:" + binlogEvent.getOriginalData());
//log.info("新数据:" + binlogEvent.getData());
//需要重新计算场景
//1、设置表变更 app_progress_warn_config
//2、进度详情记录变更 app_progress_real_detail
//3、进度计划任务变更(计划开始时间、计划结束时间、工期)app_progress_info
RestMessage restMessage = null;
CalProgressWarnVo calVo = new CalProgressWarnVo();
calVo.setType(2);
String tableName = binlogEvent.getTable();
if ("app_progress_warn_config".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(1);
ProgressWarnConfig oldConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());
ProgressWarnConfig newConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());
calVo.setNewData(newConfig);
calVo.setOldData(oldConfig);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("修改[进度预警配置]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
if ("app_progress_real_detail".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(2);
ProgressRealDetail oldDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
ProgressRealDetail newDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
calVo.setNewData(newDetail);
calVo.setOldData(oldDetail);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("修改[进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
if ("app_progress_info".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(3);
ProgressInfo oldInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
ProgressInfo newInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
calVo.setNewData(newInfo);
calVo.setOldData(oldInfo);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("修改[进度计划任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
}
@Override
public void onDelete(BinlogEvent binlogEvent) {
//log.info("数据库:" + binlogEvent.getDatabase());
//log.info("数据表:" + binlogEvent.getTable());
//log.info("删除数据:" + binlogEvent.getData());
//需要重新计算场景
//1、删除进度详情记录 app_progress_real_detail
//2、删除进度任务(子节点)app_progress_info
RestMessage restMessage = null;
CalProgressWarnVo calVo = new CalProgressWarnVo();
calVo.setType(3);
String tableName = binlogEvent.getTable();
if ("app_progress_real_detail".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(2);
ProgressRealDetail oldDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
calVo.setOldData(oldDetail);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("删除[进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
if ("app_progress_info".equals(tableName)) {
StopWatch sw = new StopWatch();
sw.start();
calVo.setDataType(3);
ProgressInfo oldInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
calVo.setOldData(oldInfo);
restMessage = progressWarnService.calProgressWarn(calVo);
sw.stop();
log.info("删除[进度计划任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
}
}
@Override
public boolean isHandle(String database, String table) {
//log.info("database:{},table:{}", database, table);
//只监控aep数据库
if (monitorDatabase.equals(database)) {
return true;
}
return false;
}
}
CalProgressWarnVo
java
import lombok.Data;
/**
* 计算进度预警参数Vo
*
* @author zwmac
*/
@Data
public class CalProgressWarnVo {
/**
* 类型:1新增,2变更,3删除
*/
private Integer type;
/**
* 数据类型:1进度预警配置,2进度详情,3进度计划任务
*/
private Integer dataType;
/**
* 旧数据
*/
private Object oldData;
/**
* 新数据
*/
private Object newData;
}
框架二: Canal
简介
canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
Github开源地址:https://github.com/alibaba/canal
工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
- canal 解析 binary log 对象(原始为 byte 流)。
Canal在伪装成为目标MySQL的一个Slave节点后,获取到来自主节点的BinaryLog日志内容。那么作为BinaryLog消费者该如何使用canal监听得到的内容呢。Canal为我们提供了两种类型的方式,直接消费和投递。直接消费即使用Canal配套提供的客户端程序,即时消费Canal的监听内容。投递是指配置指定的MQ类型以及对应信息,Canal将会按照BinaryLog的条目投递到指定的MQ下,再交由MQ为各种消费形式提供数据消费。
java实现
配置 MySQL Binlog: 确保 MySQL 启用了 binlog 功能。你可以在 MySQL 的配置文件 my.cnf 中确认以下配置:
ini
[mysqld]
log-bin=mysql-bin # 启用 binlog
binlog-format=ROW # 使用 ROW 格式,确保捕获到具体的行数据变更
server-id=1 # 设置 MySQL 实例的 server-id
依赖配置:
xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
代码示例:
java
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import java.net.InetSocketAddress;
import java.util.List;
public class MySQLChangeListener {
public static void main(String[] args) {
// 连接到 Canal Server
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*"); // 监听所有数据库和表
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 拉取数据
long batchId = message.getId();
List<Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
handleDataChange(entries);
}
connector.ack(batchId); // 确认处理完成
}
} finally {
connector.disconnect();
}
}
private static void handleDataChange(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("Error parsing binlog row data", e);
}
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT) {
System.out.println("INSERT detected");
System.out.println("New Row: " + rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE) {
System.out.println("UPDATE detected");
System.out.println("Old Row: " + rowData.getBeforeColumnsList());
System.out.println("New Row: " + rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
System.out.println("DELETE detected");
System.out.println("Deleted Row: " + rowData.getBeforeColumnsList());
}
}
}
}
}
}