当前位置:首页 > 文章列表 > 数据库 > MySQL > 如何用gpss实现MySQL到Greenplum的增量同步

如何用gpss实现MySQL到Greenplum的增量同步

来源:SegmentFault 2023-01-11 08:29:42 0浏览 收藏

亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《如何用gpss实现MySQL到Greenplum的增量同步》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下MySQL、数据库、增量、同步、Greenplum,希望所有认真读完的童鞋们,都有实质性的提高。

​数据同步一般分为两种方式:全量和增量。增量数据是一类典型的流数据,基于日志的增量同步几乎已经是所有数据库的标配,它可以减少常规ETL工作对系统带来的影响,并大大降低数据的延迟。作为Greenplum的流计算引擎,Greenplum Stream Server(gpss)能将不同源端的增量数据同步到Greenplum中。为更好的支持这一应用场景,即将发布的gpss 1.3.6 对增量同步的功能做了增强。

Greenplum Stream Server(简称gpss),是Greenplum的下一代数据加载解决方案,相比于gpfdist,GPSS会提供流数据支持及API接口,有更好的扩展性,支持更丰富的功能,并开放更细粒度的任务控制接口。在即将发布gpss 1.3.6 中,对增量同步所做的的功能增强包括:

  • 可以根据指定的递增排序字段,确保最新的消息生效
  • Merge可支持insert,update和delete三种操作

本文将以MySQL为例,简要介绍下gpss如何实现向Greenplum的增量同步。

1测试环境

  • MySQL 8.0.13
  • Maxwell 1.25.0
  • Kafka 2.2.2
  • Greenplum 6.4.0
  • GPSS 1.3.6

我们要完成的工作是:

  • 通过Maxwell监听MySQL中binlog的增量变化(略)
  • 将增量数据以json的格式发送到kafka中(略)
  • 利用gpss解析kafka中的json消息
  • 将变化的数据更新到Greenplum的目标表中

MySQSL和Maxwell的配置和使用,本文将不做深入介绍,大家可以自行访问文章链接阅读学习,访问相关文章请点击文章底部的“阅读原文”。

2 测试数据简介

测试使用的表在MySQL中定义如下:

​create table t_update_delete_0 (k1 decimal,
               k2 text,
               v1 decimal,
               v2 decimal,
               v3 text,
               c1 decimal,
               c2 text);

其中 k1 和 k2 列为键,用来唯一标识一条记录, v1, v2, v3 为每次更新的数据。

在源端分别对这个表进行了insert,update和delete操作,每个语句为单独的transaction。

Insert语句为:

insert into t_update_delete_0 (k1,k2,v1,v2,v3,c1,c2) 
        values (1,'k_1', 1, 3, 'v_1', 1, 'c1');

Update语句为:

update t_update_delete_0 
      set v1=100,v2=300,v3='v_100' 
      where k1='1' and k2='k_1';

Delete语句为:

delete from t_update_delete_0 where k1='1' and k2='k_1';

3 Kafka的消息格式

Maxwell可以将捕获到binlog解析为json格式并发送到kafka,不同的操作生成的Kafka消息有细微的区别。为了将这些消息正确的恢复到Greenplum中,我们先对这三种类型的消息进行简单的分析。

Insert时生成的消息示例如下:

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "insert",
  "ts": 1586956781,
  "xid": 1398209,
  "commit": true,
  "data": {
    "k1": 41,
    "k2": "k_41",
    "v1": 818,
    "v2": 2454,
    "v3": "v_818",
    "c1": 41,
    "c2": "c_41"
  }
}

database和table表示源表的表名,ts和xid字段用于表示消息的顺序,type和data表示执行的操作及对应的数据。这些是所有消息类型通用的。

Delete生成的消息如下,type为"delete",同时data中包含了完整的内容。

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "delete",
  "ts": 1586956781,
  "xid": 1398195,
  "commit": true,
  "data": {
    "k1": 44,
    "k2": "k_44",
    "v1": 744,
    "v2": 2232,
    "v3": "v_744",
    "c1": 44,
​    "c2": "c_44"
  }
}

Update除了包含新数据外,还包含了更新之前的数据(old),这里我们只需要新数据就够了。

{
  "database": "test",
  "table": "t_update_delete_0",
  "type": "update",
  "ts": 1586956707,
  "xid": 1281915,
  "commit": true,
  "data": {
    "k1": 99,
    "k2": "k_99",
    "v1": 798,
    "v2": 2394,
    "v3": "v_798",
    "c1": 99,
    "c2": "c_99"
  },
  "old": {
    "v1": 800,
    "v2": 2400,
    "v3": "v_800"
  }
}

根据生成的消息,我们需要执行如下操作:

  • 根据ts和xid对数据进行排序
  • 根据k1和k2进行匹配
  • 对type为delete的列执行删除操作
  • 对其它type类型执行Merge(upsert)操作

4 执行gpss的Kafka JOB

Greenplum中的定义包含了排序的字段,用来区分消息更新的先后顺序,定义如下:

create table t_update_delete_0 (k1 decimal,
               k2 text,
               v1 decimal,
               v2 decimal,
               v3 text,
               c1 decimal,
               c2 text,
               ts decimal,
               xid decimal,
               del_mark boolean);

根据数据同步的需求,gpss需要的yaml配置文件如下:

DATABASE: test
USER: gpadmin
HOST: mdw
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: kafkahost:9092
        TOPIC: test
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      MODE: MERGE
      MATCH_COLUMNS:
        - k1
        - k2
      UPDATE_COLUMNS:
        - v1
        - v2
        - v3
      ORDER_COLUMNS:
        - ts
        - xid
      DELETE_CONDITION: del_mark
      TABLE: t_update_delete_0
      MAPPING:
         k1 : (c1->'data'->>'k1')::decimal
         k2 : (c1->'data'->>'k2')::text
         v1 : (c1->'data'->>'v1')::decimal
         v2 : (c1->'data'->>'v2')::decimal
         v3 : (c1->'data'->>'v3')::text
         c1 : (c1->'data'->>'c1')::decimal
         c2 : (c1->'data'->>'c2')::text
         ts : (c1->>'ts')::decimal
         xid: (c1->>'xid')::decimal
         del_mark: (c1->>'type')::text = 'delete'
   COMMIT:
      MINIMAL_INTERVAL: 2000

几个主要的配置含义如下:

  • ORDER_COLUMNS:递增排序的字段,每个batch中,gpss会使用`ORDER_COLUMNS`最大的消息内容对目标表进行操作。
  • DELETE_CONDITION:软删除标记,gpss会删除包含`DELETE_CONDITION`字段的匹配记录
  • MATCH_COLUMNS:记录的标识,也就是键(candidate key)
  • UPDATE_COLUMNS:需要更新的列

概括下来,gpss执行的步骤为:

  1. 在一个batch中,针对MATCH_COLUMNS相同的所有记录,先根据ORDER_COLUMNS去重
  2. 目标表中存在MATCH_COLUMNS匹配的记录时,根据UPDATE_CONDITION或者DELETE_CONDITION执行更新或者删除操作
  3. 目标表中不存在时匹配记录时,执行插入操作。

(由于有去重操作,为保证不丢失数据,在UPDATE时,Kafka的消息中需要包含整行的数据,而不仅仅是更新部分的数据。)

配置文件准备好后,我们通过gpkafka来执行加载:

gpkafka load mysql.yaml

gpkafka便会从kafka中拉取对应的消息,按照设定的操作将Kafka中的增量数据同步到目标表中。

5 小结

本文简单介绍了如何用gpss从MySQL进行增量同步,其它数据库(例如Oracle,SQL Server等)也都可以利用类似的方案实现同步。不同的消息类型需要不同的处理逻辑,gpss的配置文件中有很多可以进行后处理的部分,更详细的内容可以参考官方文档:https://gpdb.docs.pivotal.io/...。由于源端系统的多样性,gpss的增量复制仍有很多需要完善的地方。在gpss后续版本我们会持续增强相关功能,例如一对多(一个topic到多个目标表)的同步,自动依据topic offset排序等;欢迎大家使用,反馈,指导。也欢迎大家前往askGP(ask.greenplum.cn)交流。

获得Greenplum更多干货内容,欢迎前往Greenplum中文社区网站

image

今天关于《如何用gpss实现MySQL到Greenplum的增量同步》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

版本声明
本文转载于:SegmentFault 如有侵犯,请联系study_golang@163.com删除
你插入MySQL的数据真的存到表里了么?你插入MySQL的数据真的存到表里了么?
上一篇
你插入MySQL的数据真的存到表里了么?
聊聊maxwell的BinlogConnectorDiagnostic
下一篇
聊聊maxwell的BinlogConnectorDiagnostic
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • SEO标题魔匠AI:高质量学术写作平台,毕业论文生成与优化专家
    魔匠AI
    SEO摘要魔匠AI专注于高质量AI学术写作,已稳定运行6年。提供无限改稿、选题优化、大纲生成、多语言支持、真实参考文献、数据图表生成、查重降重等全流程服务,确保论文质量与隐私安全。适用于专科、本科、硕士学生及研究者,满足多语言学术需求。
    16次使用
  • PPTFake答辩PPT生成器:一键生成高效专业的答辩PPT
    PPTFake答辩PPT生成器
    PPTFake答辩PPT生成器,专为答辩准备设计,极致高效生成PPT与自述稿。智能解析内容,提供多样模板,数据可视化,贴心配套服务,灵活自主编辑,降低制作门槛,适用于各类答辩场景。
    30次使用
  • SEO标题Lovart AI:全球首个设计领域AI智能体,实现全链路设计自动化
    Lovart
    SEO摘要探索Lovart AI,这款专注于设计领域的AI智能体,通过多模态模型集成和智能任务拆解,实现全链路设计自动化。无论是品牌全案设计、广告与视频制作,还是文创内容创作,Lovart AI都能满足您的需求,提升设计效率,降低成本。
    32次使用
  • 美图AI抠图:行业领先的智能图像处理技术,3秒出图,精准无误
    美图AI抠图
    美图AI抠图,依托CVPR 2024竞赛亚军技术,提供顶尖的图像处理解决方案。适用于证件照、商品、毛发等多场景,支持批量处理,3秒出图,零PS基础也能轻松操作,满足个人与商业需求。
    37次使用
  • SEO标题PetGPT:智能桌面宠物程序,结合AI对话的个性化陪伴工具
    PetGPT
    SEO摘要PetGPT 是一款基于 Python 和 PyQt 开发的智能桌面宠物程序,集成了 OpenAI 的 GPT 模型,提供上下文感知对话和主动聊天功能。用户可高度自定义宠物的外观和行为,支持插件热更新和二次开发。适用于需要陪伴和效率辅助的办公族、学生及 AI 技术爱好者。
    37次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码