当前位置:首页 > 文章列表 > 数据库 > MySQL > MySQL数据库增量日志解析工具 Canal 实战

MySQL数据库增量日志解析工具 Canal 实战

来源:SegmentFault 2023-01-14 21:15:39 0浏览 收藏

数据库小白一枚,正在不断学习积累知识,现将学习到的知识记录一下,也是将我的所得分享给大家!而今天这篇文章《MySQL数据库增量日志解析工具 Canal 实战》带大家来了解一下MySQL数据库增量日志解析工具 Canal 实战,希望对大家的知识积累有所帮助,从而弥补自己的不足,助力实战开发!

简介

canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

应用场景

  • 数据库实时备份
  • 业务cache刷新
  • 索引构建和实时维护,例:将商品数据推送到es中构建倒排索引
  • 带业务逻辑的增量数据处理,例:增量数据推送到第三方平台

官网

https://github.com/alibaba/canal

原理

image.png
  1. MySQL
    master
    将数据写入
    binlog
  2. canal
    master
    发送
    dump
    协议
  3. master
    收到
    dump
    请求,推送
    binlog
    canal
  4. canal
    解析
    binlog
    ,可讲数据投递到
    MQ
    系统中,目前支持
    kafka
    RocketMQ

安装

配置mysql

建议的mysql版本是5.7.x
mysql8.0.x见官方说明https://github.com/alibaba/ca...

修改mysql配置文件

my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1

新增用户并授权,测试的话可以直接使用root用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

配置canal

查看是否安装java

➜  canal-admin java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

若没有安装,去oracle官网下载

1.8
版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac系统下载的dmg,直接点击安装,不再累述image.png

访问release页面,下载最新稳定版1.14

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压到指定目录

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.4.tar.gz  -C /tmp/canal

进入到canal目录,查看文件

  • bin目录:运行命令
  • conf目录:配置文件目录
  • logs目录:记录运行log
  • lib目录:jar包

➜  ll                                  
total 0
drwxr-xr-x   7 jiao  staff   224B  6  3 17:17 bin
drwxr-xr-x   9 jiao  staff   288B  6  3 20:12 conf
drwxr-xr-x  83 jiao  staff   2.6K  6  1 17:54 lib
drwxr-xr-x   7 jiao  staff   224B  6  3 17:05 logs
drwxr-xr-x   7 jiao  staff   224B  5 29 17:43 pierced

修改

instance
配置文件

vi conf/example/instance.properties

配置参数详解:https://github.com/alibaba/ca...

我这里修改了数据连接地址,用户和密码,以及同步规则:只同步test数据库的sc_user表

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = 123456
#主库binlog文件路径
#canal.instance.standby.journal.name =
#主库binlog偏移量
#canal.instance.standby.position =
canal.instance.filter.regex = test\\.sc_user

修改

Server
配置文件
vim conf/canal.properties

#tcp bind ip,设置投递到tcp时需设置
canal.ip = 192.168.101.47
#register ip to zookeeper
canal.register.ip = 192.168.101.47
canal.port = 11111
canal.metrics.pull.port = 11112
#canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

#canal-admin需设置,先不设置 
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#zk设置,单机的canal可不设置
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
#flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
#tcp, kafka, RocketMQ,投递到哪
canal.serverMode = RocketMQ
#flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
##memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
##memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
##meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

##detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

#support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
#mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

#network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

#binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

#binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#binlog ddl isolation
canal.instance.get.ddl.isolation = false

#parallel parser config
canal.instance.parser.parallel = true
##concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
##disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

#table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
#purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########         destinations        #############
#################################################
#使用的instance,可设置上面定义的example
canal.destinations = example
#conf root dir
canal.conf.dir = ../conf
#auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
#rocketmq地址设置
canal.mq.servers = 192.168.101.47:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
#Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
#aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

测试tcp

下载canal php客户端: https://github.com/xingwenge/canal-php

修改

src/sample/client.php
文件

<?php namespace xingwenge\canal_php\sample;

use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;

require_once __DIR__ . '/../../vendor/autoload.php';

ini_set('display_errors', 'On');
error_reporting(E_ALL);

try {
    $host   = '192.168.101.47';//canal server地址
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

    $client->connect($host, 11111);
    $client->checkValid('canal','E3619321C1A937C46A0D8BD1DAC39F93B27D4458');//认证信息,对于canal.properties配置的canal.user 和canal.passwd
//    $client->subscribe("1001", "example", ".*\\..*");
    $client->subscribe("1001", "example", "test.sc_user"); # 设置过滤规则

    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }
        sleep(1);
    }

    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

运行

canal

sh bin/startup.sh

查看
log
是否有错误信息
tail -n 50 -f logs/example/example.log

输出一下内容说明
canal
已正常启动,
canal
已成功连接
mysql
准备发送
dump
指令同步数据

2020-06-03 19:30:47.625 [destination = example , address = /127.0.0.1:3307 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=303179,serverId=1,gtid=,timestamp=1591183839000] cost : 5ms , the next step is binlog dump

运行

php client.php
,并在数据库
sc_user
表中修改一条数据,可以看到
client
输出了数据库修改的内容
`================> binlog[mysql-bin.000001 : 208118],name[wcc-scm-service,sc_status_history], eventType: 2

-------> before
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂1 update= false
remark : update= false
created : 2020-05-28 18:40:02 update= false
-------> after
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂12 update= true
remark : update= false
created : 2020-05-28 18:40:02 update= false
TSocket: Could not read 4 bytes from 192.168.101.47:11111`

测试RocketMQ

安装RocketMQ

这里直接在docker里安装

1.安装server

docker run -d -p 9876:9876 --name rocketmq-server -e "MAX\_POSSIBLE\_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

2.新增配置文件
echo "brokerIP1=192.168.101.47" > broker.properties

3.安装
broker
/path/broker.properties
替换成第2步文件路径
docker run -d -p 10911:10911 -p 10909:10909 -v /path/broker.properties:/opt/rocketmq-4.4.0/bin/broker.properties --name rocketmq-broker --link rocketmq-server -e "NAMESRV\_ADDR=rocketmq-server:9876" -e "MAX\_POSSIBLE\_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c broker.properties

4.安装RocketMQ Web
docker run -e "JAVA\_OPTS=-Drocketmq.namesrv.addr=192.168.101.47:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng -d

访问Rocket管理页面
http://localhost:8080/

image.png

修改配置文件

修改

canal.properties

投递到
tcp
修改成投递到
RocketMQ

`canal.serverMode = RocketMQ
canal.mq.servers = 192.168.101.47:9876`

运行canal

sh bin/startup.sh

查看log文件,排查是否有错误
tail -n 50 -f logs/canal/canal.log

tail -n 50 -f logs/example/example.log

顺利的话,在RocketMQ中可以查看到,新增的Topic和Message,RocketMQ可以在manage页面直接查看messge内容

image.png
image.png

后记

在安装和使用canal的时候还是遇到了一些

,比如
java高版本报错
投递到消息队列失败
等,需要耐心排查log日志,分析原因,canal也用到了很多技术栈[zookeeper、kafka、RocketMQ],后续将会进一步去深入研究。

今天关于《MySQL数据库增量日志解析工具 Canal 实战》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于mysql的内容请关注golang学习网公众号!

版本声明
本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
mysql基本约定与命名规范mysql基本约定与命名规范
上一篇
mysql基本约定与命名规范
MySQL的开发必会的sql语句
下一篇
MySQL的开发必会的sql语句
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    514次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    499次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • SEO  AI Mermaid 流程图:自然语言生成,文本驱动可视化创作
    AI Mermaid流程图
    SEO AI Mermaid 流程图工具:基于 Mermaid 语法,AI 辅助,自然语言生成流程图,提升可视化创作效率,适用于开发者、产品经理、教育工作者。
    633次使用
  • 搜获客笔记生成器:小红书医美爆款内容AI创作神器
    搜获客【笔记生成器】
    搜获客笔记生成器,国内首个聚焦小红书医美垂类的AI文案工具。1500万爆款文案库,行业专属算法,助您高效创作合规、引流的医美笔记,提升运营效率,引爆小红书流量!
    640次使用
  • iTerms:一站式法律AI工作台,智能合同审查起草与法律问答专家
    iTerms
    iTerms是一款专业的一站式法律AI工作台,提供AI合同审查、AI合同起草及AI法律问答服务。通过智能问答、深度思考与联网检索,助您高效检索法律法规与司法判例,告别传统模板,实现合同一键起草与在线编辑,大幅提升法律事务处理效率。
    655次使用
  • TokenPony:AI大模型API聚合平台,一站式接入,高效稳定高性价比
    TokenPony
    TokenPony是讯盟科技旗下的AI大模型聚合API平台。通过统一接口接入DeepSeek、Kimi、Qwen等主流模型,支持1024K超长上下文,实现零配置、免部署、极速响应与高性价比的AI应用开发,助力专业用户轻松构建智能服务。
    724次使用
  • 迅捷AIPPT:AI智能PPT生成器,高效制作专业演示文稿
    迅捷AIPPT
    迅捷AIPPT是一款高效AI智能PPT生成软件,一键智能生成精美演示文稿。内置海量专业模板、多样风格,支持自定义大纲,助您轻松制作高质量PPT,大幅节省时间。
    619次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码