当前位置:首页 > 文章列表 > 数据库 > MySQL > MySQL数据库增量日志解析工具 Canal 实战

MySQL数据库增量日志解析工具 Canal 实战

来源:SegmentFault 2023-01-14 21:15:39 0浏览 收藏

数据库小白一枚,正在不断学习积累知识,现将学习到的知识记录一下,也是将我的所得分享给大家!而今天这篇文章《MySQL数据库增量日志解析工具 Canal 实战》带大家来了解一下MySQL数据库增量日志解析工具 Canal 实战,希望对大家的知识积累有所帮助,从而弥补自己的不足,助力实战开发!

简介

canal,阿里开源工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

应用场景

  • 数据库实时备份
  • 业务cache刷新
  • 索引构建和实时维护,例:将商品数据推送到es中构建倒排索引
  • 带业务逻辑的增量数据处理,例:增量数据推送到第三方平台

官网

https://github.com/alibaba/canal

原理

image.png
  1. MySQL
    master
    将数据写入
    binlog
  2. canal
    master
    发送
    dump
    协议
  3. master
    收到
    dump
    请求,推送
    binlog
    canal
  4. canal
    解析
    binlog
    ,可讲数据投递到
    MQ
    系统中,目前支持
    kafka
    RocketMQ

安装

配置mysql

建议的mysql版本是5.7.x
mysql8.0.x见官方说明https://github.com/alibaba/ca...

修改mysql配置文件

my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1

新增用户并授权,测试的话可以直接使用root用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '123456'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

配置canal

查看是否安装java

➜  canal-admin java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)

若没有安装,去oracle官网下载

1.8
版本(切勿选择高版本),选择适合自己系统的版本安装即可,mac系统下载的dmg,直接点击安装,不再累述image.png

访问release页面,下载最新稳定版1.14

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压到指定目录

mkdir /tmp/canal
tar zxvf canal.deployer-1.1.4.tar.gz  -C /tmp/canal

进入到canal目录,查看文件

  • bin目录:运行命令
  • conf目录:配置文件目录
  • logs目录:记录运行log
  • lib目录:jar包

➜  ll                                  
total 0
drwxr-xr-x   7 jiao  staff   224B  6  3 17:17 bin
drwxr-xr-x   9 jiao  staff   288B  6  3 20:12 conf
drwxr-xr-x  83 jiao  staff   2.6K  6  1 17:54 lib
drwxr-xr-x   7 jiao  staff   224B  6  3 17:05 logs
drwxr-xr-x   7 jiao  staff   224B  5 29 17:43 pierced

修改

instance
配置文件

vi conf/example/instance.properties

配置参数详解:https://github.com/alibaba/ca...

我这里修改了数据连接地址,用户和密码,以及同步规则:只同步test数据库的sc_user表

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.dbUsername = canal  
canal.instance.dbPassword = 123456
#主库binlog文件路径
#canal.instance.standby.journal.name =
#主库binlog偏移量
#canal.instance.standby.position =
canal.instance.filter.regex = test\\.sc_user

修改

Server
配置文件
vim conf/canal.properties

#tcp bind ip,设置投递到tcp时需设置
canal.ip = 192.168.101.47
#register ip to zookeeper
canal.register.ip = 192.168.101.47
canal.port = 11111
canal.metrics.pull.port = 11112
#canal instance user/passwd
#canal.user = canal
#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

#canal-admin需设置,先不设置 
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#zk设置,单机的canal可不设置
canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
#flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
#tcp, kafka, RocketMQ,投递到哪
canal.serverMode = RocketMQ
#flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
##memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
##memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
##meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

##detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

#support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
#mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

#network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

#binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

#binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#binlog ddl isolation
canal.instance.get.ddl.isolation = false

#parallel parser config
canal.instance.parser.parallel = true
##concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
##disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

#table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
#dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
#purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########         destinations        #############
#################################################
#使用的instance,可设置上面定义的example
canal.destinations = example
#conf root dir
canal.conf.dir = ../conf
#auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########              MQ              #############
##################################################
#rocketmq地址设置
canal.mq.servers = 192.168.101.47:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
#Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
#aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

测试tcp

下载canal php客户端: https://github.com/xingwenge/canal-php

修改

src/sample/client.php
文件

<?php namespace xingwenge\canal_php\sample;

use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;

require_once __DIR__ . '/../../vendor/autoload.php';

ini_set('display_errors', 'On');
error_reporting(E_ALL);

try {
    $host   = '192.168.101.47';//canal server地址
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

    $client->connect($host, 11111);
    $client->checkValid('canal','E3619321C1A937C46A0D8BD1DAC39F93B27D4458');//认证信息,对于canal.properties配置的canal.user 和canal.passwd
//    $client->subscribe("1001", "example", ".*\\..*");
    $client->subscribe("1001", "example", "test.sc_user"); # 设置过滤规则

    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }
        sleep(1);
    }

    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

运行

canal

sh bin/startup.sh

查看
log
是否有错误信息
tail -n 50 -f logs/example/example.log

输出一下内容说明
canal
已正常启动,
canal
已成功连接
mysql
准备发送
dump
指令同步数据

2020-06-03 19:30:47.625 [destination = example , address = /127.0.0.1:3307 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=303179,serverId=1,gtid=,timestamp=1591183839000] cost : 5ms , the next step is binlog dump

运行

php client.php
,并在数据库
sc_user
表中修改一条数据,可以看到
client
输出了数据库修改的内容
`================> binlog[mysql-bin.000001 : 208118],name[wcc-scm-service,sc_status_history], eventType: 2

-------> before
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂1 update= false
remark : update= false
created : 2020-05-28 18:40:02 update= false
-------> after
status_history_id : 2543 update= false
record_id : 662 update= false
status_type : 151 update= false
old_status : 158 update= false
new_status : 153 update= false
opt_type : SELLER_REVOKE update= false
user_id : 282 update= false
user_name : 加工厂12 update= true
remark : update= false
created : 2020-05-28 18:40:02 update= false
TSocket: Could not read 4 bytes from 192.168.101.47:11111`

测试RocketMQ

安装RocketMQ

这里直接在docker里安装

1.安装server

docker run -d -p 9876:9876 --name rocketmq-server -e "MAX\_POSSIBLE\_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

2.新增配置文件
echo "brokerIP1=192.168.101.47" > broker.properties

3.安装
broker
/path/broker.properties
替换成第2步文件路径
docker run -d -p 10911:10911 -p 10909:10909 -v /path/broker.properties:/opt/rocketmq-4.4.0/bin/broker.properties --name rocketmq-broker --link rocketmq-server -e "NAMESRV\_ADDR=rocketmq-server:9876" -e "MAX\_POSSIBLE\_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c broker.properties

4.安装RocketMQ Web
docker run -e "JAVA\_OPTS=-Drocketmq.namesrv.addr=192.168.101.47:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng -d

访问Rocket管理页面
http://localhost:8080/

image.png

修改配置文件

修改

canal.properties

投递到
tcp
修改成投递到
RocketMQ

`canal.serverMode = RocketMQ
canal.mq.servers = 192.168.101.47:9876`

运行canal

sh bin/startup.sh

查看log文件,排查是否有错误
tail -n 50 -f logs/canal/canal.log

tail -n 50 -f logs/example/example.log

顺利的话,在RocketMQ中可以查看到,新增的Topic和Message,RocketMQ可以在manage页面直接查看messge内容

image.png
image.png

后记

在安装和使用canal的时候还是遇到了一些

,比如
java高版本报错
投递到消息队列失败
等,需要耐心排查log日志,分析原因,canal也用到了很多技术栈[zookeeper、kafka、RocketMQ],后续将会进一步去深入研究。

今天关于《MySQL数据库增量日志解析工具 Canal 实战》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于mysql的内容请关注golang学习网公众号!

版本声明
本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
mysql基本约定与命名规范mysql基本约定与命名规范
上一篇
mysql基本约定与命名规范
MySQL的开发必会的sql语句
下一篇
MySQL的开发必会的sql语句
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    516次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    500次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    485次学习
查看更多
AI推荐
  • ChatExcel酷表:告别Excel难题,北大团队AI助手助您轻松处理数据
    ChatExcel酷表
    ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
    3167次使用
  • Any绘本:开源免费AI绘本创作工具深度解析
    Any绘本
    探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
    3380次使用
  • 可赞AI:AI驱动办公可视化智能工具,一键高效生成文档图表脑图
    可赞AI
    可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
    3409次使用
  • 星月写作:AI网文创作神器,助力爆款小说速成
    星月写作
    星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
    4513次使用
  • MagicLight.ai:叙事驱动AI动画视频创作平台 | 高效生成专业级故事动画
    MagicLight
    MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
    3789次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码