Skip to content

Java 实时监听MySQL数据变化

框架一: Binlog4j

Binlog4j 是一个纯 Java 实现的 MySQL binlog 监听工具,它可以直接解析 MySQL 的 binlog,类似于 Canal,但比 Canal 更轻量,不需要额外的服务端配置,适合集成到 Java 项目中。

仓库地址:https://gitee.com/dromara/binlog4j

开始

1.配置 MySQL Binlog: 确保 MySQL 启用了 binlog 功能。

2.依赖配置:

xml
<!-- binlog支持 -->
<dependency>
    <groupId>com.gitee.Jmysy</groupId>
    <artifactId>binlog4j-spring-boot-starter</artifactId>
    <version>1.9.1</version>
</dependency>

3.配置文件

yml
spring:  
  binlog4j:
    database: 要监听的数据库(一个实例上有多个库)
    redis-config: #redis配置
      host: ip
      port: 端口
      password: 密码
    client-configs:
      master:
        username: 数据库用户
        password: 密码
        host: 数据库ip
        port: 端口
        serverId: 1990

配置说明

  • timeOffset 时间偏移量, 单位:毫秒
  • serverId 编号
  • redisConfig Redis 配置信息, 详见 RedisConfig
  • inaugural 首次启动, 如果为 true 在启动时不再读取 Redis 记录
  • persistence 是否启用持久化, 默认为 false
  • strict 严格模式, 默认为 true
  • mode 模式, 详见: BinlogClientMode
  • username 数据库账户
  • password 数据库密码
  • host 数据库所在服务器 IP 地址
  • port 数据库占用端口, 默认 3306
  • hikariConfig 数据库连接池配置

4. java示例

java
@Slf4j
@BinlogSubscriber(clientName = "master")
public class BinlogPositionEventHandler implements IBinlogEventHandler {
    @Override
    public void onInsert(BinlogEvent event) {
        log.info("===onInsert: {}", JSONUtil.toJsonStr(event));
        // 处理插入事件
    }
    @Override
    public void onUpdate(BinlogEvent event) {
        log.info("===onUpdate: {}", JSONUtil.toJsonStr(event));
        // 处理更新事件
    }
    @Override
    public void onDelete(BinlogEvent event) {
        log.info("===onDelete: {}", JSONUtil.toJsonStr(event));
        // 处理删除事件
    }
    @Override
    public boolean isHandle(String database, String table) {
        log.info("===database: {}, table: {}", database, table);
        // 这里可以添加额外的过滤逻辑,如果数据库和表名匹配,则返回 true
        // return "mydatabase".equals(database) && "mytable".equals(table);
        return true;
    }
}

泛型参数

在 BinlogEvent 中 data 与 originalData 的 Class 类型为 Map<String, Object>, 为进一步降低使用的心智负担, IBinlogEventHandler 接口提供了泛型参数的支持, binlog4j 将依据泛型参数, 将 接收到的数据转换为 JavaBean。

java
public class UserBinlogEventHandler implements IBinlogEventHandler<User> {

    @Override
    public void onInsert(BinlogEvent<User> event) {
        System.out.println("插入数据:" + event.getData());
    }

    @Override
    public void onUpdate(BinlogEvent<User> event) {
        System.out.println("修改数据:" + event.getData());
    }

    @Override
    public void onDelete(BinlogEvent<User> event) {
        System.out.println("删除数据:" + event.getData());
    }

    @Override
    public boolean isHandle(String database, String table) {
        return database.equals("pear-admin") && table.equals("sys_user");
    }
}

使用案例: 统一监听处理

java
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.date.StopWatch;
import cn.hutool.json.JSONUtil;
import com.gitee.Jmysy.binlog4j.core.BinlogEvent;
import com.gitee.Jmysy.binlog4j.core.IBinlogEventHandler;
import com.gitee.Jmysy.binlog4j.springboot.starter.annotation.BinlogSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import site.morn.rest.RestBuilders;
import site.morn.rest.RestMessage;

import javax.annotation.Resource;

/**
 * binlog事件处理器
 * 连接数据的用户需要有binlog读权限
 *
 * @author zwmac
 */
@Slf4j
@BinlogSubscriber(clientName = "master")
public class MyBinlogEventHandler implements IBinlogEventHandler {

    @Value("${spring.binlog4j.database:linkappdb}")
    public String monitorDatabase;

    @Resource
    private ProgressWarnService progressWarnService;

    @Override
    public void onInsert(BinlogEvent binlogEvent) {
        //log.info("数据库:" + binlogEvent.getDatabase());
        //log.info("数据表:" + binlogEvent.getTable());
        //log.info("插入数据:" + binlogEvent.getData());

        //需要重新计算场景
        //1、插入设置 app_progress_warn_config
        //2、新增实际进度详情 app_progress_real_detail
        //3、新增进度计划任务(子节点)app_progress_info
        RestMessage restMessage = RestBuilders.successMessage();
        CalProgressWarnVo calVo = new CalProgressWarnVo();
        calVo.setType(1);
        String tableName = binlogEvent.getTable();
        if ("app_progress_warn_config".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(1);
            ProgressWarnConfig progressWarnConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setNewData(progressWarnConfig);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("新增[进度预警配置]数据触发binlog事件执行结果:{}-耗时:{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
        if ("app_progress_real_detail".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(2);
            ProgressRealDetail realDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setNewData(realDetail);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("新增[实际进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
        if ("app_progress_info".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            Object progressInfoObj = binlogEvent.getData();
            ProgressInfo progressInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
            if (progressInfo.getType() == 2) {
                //计划任务才需要重新计算
                calVo.setDataType(3);
                calVo.setNewData(progressInfo);
                restMessage = progressWarnService.calProgressWarn(calVo);
            }
            sw.stop();
            log.info("新增[进度任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }

    }

    @Override
    public void onUpdate(BinlogEvent binlogEvent) {
        //log.info("数据库:" + binlogEvent.getDatabase());
        //log.info("数据表:" + binlogEvent.getTable());
        //log.info("原数据:" + binlogEvent.getOriginalData());
        //log.info("新数据:" + binlogEvent.getData());
        //需要重新计算场景
        //1、设置表变更 app_progress_warn_config
        //2、进度详情记录变更 app_progress_real_detail
        //3、进度计划任务变更(计划开始时间、计划结束时间、工期)app_progress_info

        RestMessage restMessage = null;
        CalProgressWarnVo calVo = new CalProgressWarnVo();
        calVo.setType(2);
        String tableName = binlogEvent.getTable();
        if ("app_progress_warn_config".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(1);
            ProgressWarnConfig oldConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());
            ProgressWarnConfig newConfig = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressWarnConfig.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setNewData(newConfig);
            calVo.setOldData(oldConfig);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("修改[进度预警配置]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
        if ("app_progress_real_detail".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(2);
            ProgressRealDetail oldDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
            ProgressRealDetail newDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setNewData(newDetail);
            calVo.setOldData(oldDetail);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("修改[进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
        if ("app_progress_info".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(3);
            ProgressInfo oldInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
            ProgressInfo newInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setNewData(newInfo);
            calVo.setOldData(oldInfo);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("修改[进度计划任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
    }

    @Override
    public void onDelete(BinlogEvent binlogEvent) {
        //log.info("数据库:" + binlogEvent.getDatabase());
        //log.info("数据表:" + binlogEvent.getTable());
        //log.info("删除数据:" + binlogEvent.getData());
        //需要重新计算场景
        //1、删除进度详情记录 app_progress_real_detail
        //2、删除进度任务(子节点)app_progress_info
        RestMessage restMessage = null;
        CalProgressWarnVo calVo = new CalProgressWarnVo();
        calVo.setType(3);
        String tableName = binlogEvent.getTable();
        if ("app_progress_real_detail".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(2);
            ProgressRealDetail oldDetail = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressRealDetail.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setOldData(oldDetail);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("删除[进度详情]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
        if ("app_progress_info".equals(tableName)) {
            StopWatch sw = new StopWatch();
            sw.start();
            calVo.setDataType(3);
            ProgressInfo oldInfo = BeanUtil.mapToBean(JSONUtil.parseObj(binlogEvent.getOriginalData()), ProgressInfo.class, true, CopyOptions.create().ignoreNullValue());
            calVo.setOldData(oldInfo);
            restMessage = progressWarnService.calProgressWarn(calVo);
            sw.stop();
            log.info("删除[进度计划任务]数据触发binlog事件执行结果:{}-耗时{}ms", restMessage.getCode(), sw.getTotalTimeMillis());
        }
    }

    @Override
    public boolean isHandle(String database, String table) {
        //log.info("database:{},table:{}", database, table);
        //只监控aep数据库
        if (monitorDatabase.equals(database)) {
            return true;
        }
        return false;
    }
}

CalProgressWarnVo

java
import lombok.Data;

/**
 * 计算进度预警参数Vo
 *
 * @author zwmac
 */
@Data
public class CalProgressWarnVo {
    /**
     * 类型:1新增,2变更,3删除
     */
    private Integer type;

    /**
     * 数据类型:1进度预警配置,2进度详情,3进度计划任务
     */
    private Integer dataType;

    /**
     * 旧数据
     */
    private Object oldData;

    /**
     * 新数据
     */
    private Object newData;

}

框架二: Canal

简介

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Github开源地址:https://github.com/alibaba/canal

canal

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )。
  • canal 解析 binary log 对象(原始为 byte 流)。

Canal在伪装成为目标MySQL的一个Slave节点后,获取到来自主节点的BinaryLog日志内容。那么作为BinaryLog消费者该如何使用canal监听得到的内容呢。Canal为我们提供了两种类型的方式,直接消费和投递。直接消费即使用Canal配套提供的客户端程序,即时消费Canal的监听内容。投递是指配置指定的MQ类型以及对应信息,Canal将会按照BinaryLog的条目投递到指定的MQ下,再交由MQ为各种消费形式提供数据消费。

java实现

配置 MySQL Binlog: 确保 MySQL 启用了 binlog 功能。你可以在 MySQL 的配置文件 my.cnf 中确认以下配置:

ini
[mysqld]
log-bin=mysql-bin      # 启用 binlog
binlog-format=ROW      # 使用 ROW 格式,确保捕获到具体的行数据变更
server-id=1            # 设置 MySQL 实例的 server-id

依赖配置:

xml
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
</dependency>

代码示例:

java
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;

import java.net.InetSocketAddress;
import java.util.List;

public class MySQLChangeListener {

    public static void main(String[] args) {
        // 连接到 Canal Server
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), 
                "example", "", "");
        try {
            connector.connect();
            connector.subscribe(".*\\..*"); // 监听所有数据库和表
            connector.rollback();

            while (true) {
                Message message = connector.getWithoutAck(100); // 拉取数据
                long batchId = message.getId();
                List<Entry> entries = message.getEntries();

                if (batchId != -1 && entries.size() > 0) {
                    handleDataChange(entries);
                }
                connector.ack(batchId); // 确认处理完成
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void handleDataChange(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChange;
                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("Error parsing binlog row data", e);
                }

                EventType eventType = rowChange.getEventType();
                for (RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == EventType.INSERT) {
                        System.out.println("INSERT detected");
                        System.out.println("New Row: " + rowData.getAfterColumnsList());
                    } else if (eventType == EventType.UPDATE) {
                        System.out.println("UPDATE detected");
                        System.out.println("Old Row: " + rowData.getBeforeColumnsList());
                        System.out.println("New Row: " + rowData.getAfterColumnsList());
                    } else if (eventType == EventType.DELETE) {
                        System.out.println("DELETE detected");
                        System.out.println("Deleted Row: " + rowData.getBeforeColumnsList());
                    }
                }
            }
        }
    }
}

粤ICP备20009776号