随便聊聊
今天是离职的50天,面试不咋顺利,昨晚也没睡好,有个离职的同事去日本玩了,不知道他是什么考虑。脱离工作久了舒服是确实舒服,但是找工作真的挺难的,不仅是投递,面试,复习,复盘,一环扣一环,而且还很累。
本来挺有把握的一场面试,结果大晚上查了一下发现面试不通过,也不知道啥原因,有一说一虽然我不一定会去,不过得知面试失效的消息打击还是比较大的,不知道是不是薪酬说高了。
离职还没有让家里人知道,把面试失败的消息跟老姐说后,她送了我一句话,此处不留爷,自有留爷处。我觉得说的太好了。
来看今天的flink-cdc的使用吧。
正文
flink-cdc介绍
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。
一般情况下,我们使用CDC将数据库增量数据同步到不同的数据源中,比如ES, ClickHouse之类的。
通常来讲,CDC 分为主动查询和事件接收两种技术实现模式。
对于主动查询而言,通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。
事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。
Flink-cdc是由apache托管的开源项目,使用监听bin-log的方式获取增量数据,并且支持常用的数据库,例如mysql,mongodb,tidb等等。
环境搭建
这里用docker容器快速启动一个mysql。
1 | services: |
容器启动后需要做点准备工作,修改mysql的时区。因为默认情况下cdc应用和mysql时区不一致的话,可能会导致数据监听有延迟。任务会一直在报错,无法正常监听。具体报错可以在webUI里看到。感兴趣可以不改mysql时区然后启动下flink-cdc试试。
1 | mysql> show variables like '%time_zone%'; |
创建一张测试表,再加点数据。
1 | create table test.t_user |
使用
Talk is cheap. Show me the code.
这里我们采用maven构建一个java程序,使用flink-cdc监听mysql变化。
pom文件依赖如下
1 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
在resource目录下新建logback.xml,否则日志会有debug级别。
1 |
|
然后我们创建出一个MySink作为监听数据的处理。
1 | package com.github.zer0e; |
主程序入口
1 | package com.github.zer0e; |
启动!顺利的话,启动后命令行就能输出如下的东西
1 | mysql cdc: {"before":null,"after":{"id":3,"name":"c","description":"moon1"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1721806935503,"transaction":null} |
这里主要是关于op字段,其实是对应数据库的增删改查,即c(create), d(delete), u(update), r(read)。
1 | mysql cdc: {"before":null,"after":{"id":4,"name":"d","description":"today"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1721807345000,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000004","pos":2490,"row":0,"thread":21,"query":null},"op":"c","ts_ms":1721807345604,"transaction":null} |
其他的没什么好说的,如果不想读取这么多json数据的话,可以创建实体类映射下。
但是个人尝试下来,其实要自己做解析的话会比较困难,建议还是拿到json之后,根据表名称做下parse会相对简单。
1 |
|
1 | public class MySink extends RichSinkFunction<String> { |
总结
简单搞了一个demo,实际搞下来我还看了能否自己解析原始数据,比较困难。
其次是每次重启后都会读取到所有源数据,虽然不影响,但是我不知道这个能否关闭?
之后其实就是对接es等其他数据源了,这个业务写多了都会。