当前位置:首页 > 文章列表 > 数据库 > MySQL > MySQL数据库增量日志解析工具 Canal 实战

MySQL数据库增量日志解析工具 Canal 实战

来源:SegmentFault 2023-01-14 21:15:39 0浏览 收藏

数据库小白一枚,正在不断学习积累知识,现将学习到的知识记录一下,也是将我的所得分享给大家!而今天这篇文章《MySQL数据库增量日志解析工具 Canal 实战》带大家来了解一下MySQL数据库增量日志解析工具 Canal 实战,希望对大家的知识积累有所帮助,从而弥补自己的不足,助力实战开发!

简介

canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

应用场景

  • 数据库实时备份
  • 业务cache刷新
  • 索引构建和实时维护,例:将商品数据推送到es中构建倒排索引
  • 带业务逻辑的增量数据处理,例:增量数据推送到第三方平台

官网

https://github.com/alibaba/canal

原理

image.png
  1. MySQL
    master
    将数据写入
    binlog
  2. canal
    master
    发送
    dump
    协议
  3. master
    收到
    dump
    请求,推送
    binlog
    canal
  4. canal
    解析
    binlog
    ,可讲数据投递到
    MQ
    系统中,目前支持
    kafka
    RocketMQ

安装

配置mysql

建议的mysql版本是5.7.x
mysql8.0.x见官方说明https://github.com/alibaba/ca...

修改mysql配置文件

my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1

新增用户并授权,测试的话可以直接使用root用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

配置canal

查看是否安装java

➜  canal-admin java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

若没有安装,去oracle官网下载

1.8
版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac系统下载的dmg,直接点击安装,不再累述image.png

访问release页面,下载最新稳定版1.14

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压到指定目录

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.4.tar.gz  -C /tmp/canal

进入到canal目录,查看文件

  • bin目录:运行命令
  • conf目录:配置文件目录
  • logs目录:记录运行log
  • lib目录:jar包

➜  ll                                  
total 0
drwxr-xr-x   7 jiao  staff   224B  6  3 17:17 bin
drwxr-xr-x   9 jiao  staff   288B  6  3 20:12 conf
drwxr-xr-x  83 jiao  staff   2.6K  6  1 17:54 lib
drwxr-xr-x   7 jiao  staff   224B  6  3 17:05 logs
drwxr-xr-x   7 jiao  staff   224B  5 29 17:43 pierced

修改

instance
配置文件

vi conf/example/instance.properties

配置参数详解:https://github.com/alibaba/ca...

我这里修改了数据连接地址,用户和密码,以及同步规则:只同步test数据库的sc_user表

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = 123456
#主库binlog文件路径
#canal.instance.standby.journal.name =
#主库binlog偏移量
#canal.instance.standby.position =
canal.instance.filter.regex = test\\.sc_user

修改

Server
配置文件
vim conf/canal.properties

#tcp bind ip,设置投递到tcp时需设置
canal.ip = 192.168.101.47
#register ip to zookeeper
canal.register.ip = 192.168.101.47
canal.port = 11111
canal.metrics.pull.port = 11112
#canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

#canal-admin需设置,先不设置 
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#zk设置,单机的canal可不设置
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
#flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
#tcp, kafka, RocketMQ,投递到哪
canal.serverMode = RocketMQ
#flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
##memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
##memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
##meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

##detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

#support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
#mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

#network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

#binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

#binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#binlog ddl isolation
canal.instance.get.ddl.isolation = false

#parallel parser config
canal.instance.parser.parallel = true
##concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
##disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

#table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
#purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########         destinations        #############
#################################################
#使用的instance,可设置上面定义的example
canal.destinations = example
#conf root dir
canal.conf.dir = ../conf
#auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
#rocketmq地址设置
canal.mq.servers = 192.168.101.47:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
#Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
#aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

测试tcp

下载canal php客户端: https://github.com/xingwenge/canal-php

修改

src/sample/client.php
文件

connect($host, 11111);
    $client->checkValid('canal','E3619321C1A937C46A0D8BD1DAC39F93B27D4458');//认证信息,对于canal.properties配置的canal.user 和canal.passwd
//    $client->subscribe("1001", "example", ".*\\..*");
    $client->subscribe("1001", "example", "test.sc_user"); # 设置过滤规则

    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }
        sleep(1);
    }

    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

运行

canal

sh bin/startup.sh

查看
log
是否有错误信息
tail -n 50 -f logs/example/example.log

输出一下内容说明
canal
已正常启动,
canal
已成功连接
mysql
准备发送
dump
指令同步数据

2020-06-03 19:30:47.625 [destination = example , address = /127.0.0.1:3307 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=303179,serverId=1,gtid=,timestamp=1591183839000] cost : 5ms , the next step is binlog dump

运行

php client.php
,并在数据库
sc_user
表中修改一条数据,可以看到
client
输出了数据库修改的内容
`================> binlog[mysql-bin.000001 : 208118],name[wcc-scm-service,sc_status_history], eventType: 2

-------> before
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂1 update= false
remark : update= false
created : 2020-05-28 18:40:02 update= false
-------> after
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂12 update= true
remark : update= false
created : 2020-05-28 18:40:02 update= false
TSocket: Could not read 4 bytes from 192.168.101.47:11111`

测试RocketMQ

安装RocketMQ

这里直接在docker里安装

1.安装server

docker run -d -p 9876:9876 --name rocketmq-server -e "MAX\_POSSIBLE\_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

2.新增配置文件
echo "brokerIP1=192.168.101.47" > broker.properties

3.安装
broker
/path/broker.properties
替换成第2步文件路径
docker run -d -p 10911:10911 -p 10909:10909 -v /path/broker.properties:/opt/rocketmq-4.4.0/bin/broker.properties --name rocketmq-broker --link rocketmq-server -e "NAMESRV\_ADDR=rocketmq-server:9876" -e "MAX\_POSSIBLE\_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c broker.properties

4.安装RocketMQ Web
docker run -e "JAVA\_OPTS=-Drocketmq.namesrv.addr=192.168.101.47:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng -d

访问Rocket管理页面
http://localhost:8080/

image.png

修改配置文件

修改

canal.properties

投递到
tcp
修改成投递到
RocketMQ

`canal.serverMode = RocketMQ
canal.mq.servers = 192.168.101.47:9876`

运行canal

sh bin/startup.sh

查看log文件,排查是否有错误
tail -n 50 -f logs/canal/canal.log

tail -n 50 -f logs/example/example.log

顺利的话,在RocketMQ中可以查看到,新增的Topic和Message,RocketMQ可以在manage页面直接查看messge内容

image.png
image.png

后记

在安装和使用canal的时候还是遇到了一些

,比如
java高版本报错
投递到消息队列失败
等,需要耐心排查log日志,分析原因,canal也用到了很多技术栈[zookeeper、kafka、RocketMQ],后续将会进一步去深入研究。

今天关于《MySQL数据库增量日志解析工具 Canal 实战》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于mysql的内容请关注golang学习网公众号!

版本声明
本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
mysql基本约定与命名规范mysql基本约定与命名规范
上一篇
mysql基本约定与命名规范
MySQL的开发必会的sql语句
下一篇
MySQL的开发必会的sql语句
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • 笔灵AI生成答辩PPT:高效制作学术与职场PPT的利器
    笔灵AI生成答辩PPT
    探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
    14次使用
  • 知网AIGC检测服务系统:精准识别学术文本中的AI生成内容
    知网AIGC检测服务系统
    知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
    23次使用
  • AIGC检测服务:AIbiye助力确保论文原创性
    AIGC检测-Aibiye
    AIbiye官网推出的AIGC检测服务,专注于检测ChatGPT、Gemini、Claude等AIGC工具生成的文本,帮助用户确保论文的原创性和学术规范。支持txt和doc(x)格式,检测范围为论文正文,提供高准确性和便捷的用户体验。
    30次使用
  • 易笔AI论文平台:快速生成高质量学术论文的利器
    易笔AI论文
    易笔AI论文平台提供自动写作、格式校对、查重检测等功能,支持多种学术领域的论文生成。价格优惠,界面友好,操作简便,适用于学术研究者、学生及论文辅导机构。
    40次使用
  • 笔启AI论文写作平台:多类型论文生成与多语言支持
    笔启AI论文写作平台
    笔启AI论文写作平台提供多类型论文生成服务,支持多语言写作,满足学术研究者、学生和职场人士的需求。平台采用AI 4.0版本,确保论文质量和原创性,并提供查重保障和隐私保护。
    35次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码