Zer0e's Blog

【架构之路9】flink-cdc学习与使用

字数统计: 3k阅读时长: 15 min
2024/07/24 Share

随便聊聊

今天是离职的50天,面试不咋顺利,昨晚也没睡好,有个离职的同事去日本玩了,不知道他是什么考虑。脱离工作久了舒服是确实舒服,但是找工作真的挺难的,不仅是投递,面试,复习,复盘,一环扣一环,而且还很累。

本来挺有把握的一场面试,结果大晚上查了一下发现面试不通过,也不知道啥原因,有一说一虽然我不一定会去,不过得知面试失效的消息打击还是比较大的,不知道是不是薪酬说高了。

离职还没有让家里人知道,把面试失败的消息跟老姐说后,她送了我一句话,此处不留爷,自有留爷处。我觉得说的太好了。

来看今天的flink-cdc的使用吧。

正文

CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。

一般情况下,我们使用CDC将数据库增量数据同步到不同的数据源中,比如ES, ClickHouse之类的。

通常来讲,CDC 分为主动查询事件接收两种技术实现模式。

对于主动查询而言,通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。这种方式优点是不涉及数据库底层特性,实现比较通用;缺点是要对业务表做改造,且实时性不高,不能确保跟踪到所有的变更记录,且持续的频繁查询对数据库的压力较大。

事件接收模式可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动;缺点是部署数据库的事件接收和解析器(例如 Debezium、Canal 等),有一定的学习和运维成本,对一些冷门的数据库支持不够。

Flink-cdc是由apache托管的开源项目,使用监听bin-log的方式获取增量数据,并且支持常用的数据库,例如mysql,mongodb,tidb等等。

环境搭建

这里用docker容器快速启动一个mysql。

1
2
3
4
5
6
7
8
9
10
11
12
services:
mysql:
image: 'mysql:8.0'
environment:
- 'MYSQL_ROOT_PASSWORD=root'
ports:
- '3306:3306'
volumes:
- mysql:/var/lib/mysql

volumes:
mysql:

容器启动后需要做点准备工作,修改mysql的时区。因为默认情况下cdc应用和mysql时区不一致的话,可能会导致数据监听有延迟。任务会一直在报错,无法正常监听。具体报错可以在webUI里看到。感兴趣可以不改mysql时区然后启动下flink-cdc试试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name | Value |
+------------------+--------+
| system_time_zone | UTC |
| time_zone | SYSTEM |
+------------------+--------+
2 rows in set (0.00 sec)

mysql> set persist time_zone='+8:00';
Query OK, 0 rows affected (0.00 sec)

mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name | Value |
+------------------+--------+
| system_time_zone | UTC |
| time_zone | +08:00 |
+------------------+--------+
2 rows in set (0.00 sec)

创建一张测试表,再加点数据。

1
2
3
4
5
6
7
8
9
10
11
12
create table test.t_user
(
id int auto_increment
primary key,
name varchar(255) not null,
description varchar(255) null
);

INSERT INTO test.t_user (id, name, description) VALUES (1, 'alice', 'test');
INSERT INTO test.t_user (id, name, description) VALUES (2, 'bob', 'fuck');
INSERT INTO test.t_user (id, name, description) VALUES (3, 'c', 'moon');

使用

Talk is cheap. Show me the code.

这里我们采用maven构建一个java程序,使用flink-cdc监听mysql变化。

pom文件依赖如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.zer0e</groupId>
<artifactId>flink-cdc-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>


<dependencies>
<!-- flink 基础包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.0</version>
</dependency>
<!--flink mysql连接器-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
<!--flink data stream支持 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
</dependency>
<!--flink java客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.0</version>
</dependency>
<!--web界面支持-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.18.0</version>
</dependency>
<!--用于读写批处理和流水表-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.18.0</version>
</dependency>

<!-- 日志-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.13</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>


</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

在resource目录下新建logback.xml,否则日志会有debug级别。

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>


<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

然后我们创建出一个MySink作为监听数据的处理。

1
2
3
4
5
6
7
8
9
10
package com.github.zer0e;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class MySink extends RichSinkFunction<String> {

public void invoke(String value, Context context) throws Exception {
System.out.println("mysql cdc: " + value);
}
}

主程序入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.github.zer0e;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CdcExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test")
.tableList("test.t_user")
.username("root")
.password("root")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.build();

Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

// 检查点的作用是当作业失败后,回到前一个检查点开始
executionEnvironment.enableCheckpointing(5000);

executionEnvironment.fromSource(source, WatermarkStrategy.<String>noWatermarks(),
"Mysql").addSink(new MySink());
executionEnvironment.execute();


}
}

启动!顺利的话,启动后命令行就能输出如下的东西

1
2
3
mysql cdc: {"before":null,"after":{"id":3,"name":"c","description":"moon1"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1721806935503,"transaction":null}
mysql cdc: {"before":null,"after":{"id":2,"name":"bob","description":"fuck"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1721806935503,"transaction":null}
mysql cdc: {"before":null,"after":{"id":1,"name":"alice","description":"test"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1721806935501,"transaction":null}

这里主要是关于op字段,其实是对应数据库的增删改查,即c(create), d(delete), u(update), r(read)。

1
2
3
4
5
mysql cdc: {"before":null,"after":{"id":4,"name":"d","description":"today"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1721807345000,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000004","pos":2490,"row":0,"thread":21,"query":null},"op":"c","ts_ms":1721807345604,"transaction":null}

mysql cdc: {"before":{"id":4,"name":"d","description":"today"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1721807364000,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000004","pos":2788,"row":0,"thread":21,"query":null},"op":"d","ts_ms":1721807364426,"transaction":null}

mysql cdc: {"before":{"id":3,"name":"c","description":"moon1"},"after":{"id":3,"name":"c","description":"moon"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1721807381000,"snapshot":"false","db":"test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000004","pos":3095,"row":0,"thread":21,"query":null},"op":"u","ts_ms":1721807381851,"transaction":null}

其他的没什么好说的,如果不想读取这么多json数据的话,可以创建实体类映射下。

但是个人尝试下来,其实要自己做解析的话会比较困难,建议还是拿到json之后,根据表名称做下parse会相对简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Message<T> {

private T before;

private T after;

private Source source;

private String op;

@JsonProperty("ts_ms")
private Long timestamp;


@Data
@ToString
public static class Source {
private String db;
private String table;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MySink extends RichSinkFunction<String> {

private static final ObjectMapper objectMapper = new ObjectMapper();

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public void invoke(String value, Context context) throws Exception {
System.out.println("mysql cdc: " + value);

Message<User> userMessage = objectMapper.readValue(value, new TypeReference<Message<User>>() {
});

System.out.println("mysql cdc2: " + userMessage);

}
}

总结

简单搞了一个demo,实际搞下来我还看了能否自己解析原始数据,比较困难。

其次是每次重启后都会读取到所有源数据,虽然不影响,但是我不知道这个能否关闭?

之后其实就是对接es等其他数据源了,这个业务写多了都会。

原文作者:Zer0e

原文链接:https://re0.top/2024/07/24/devops9/

发表日期:July 24th 2024, 6:30:00 pm

更新日期:July 24th 2024, 10:00:19 pm

版权声明:本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可

CATALOG
  1. 1. 随便聊聊
  2. 2. 正文
    1. 2.1. flink-cdc介绍
    2. 2.2. 环境搭建
    3. 2.3. 使用
  3. 3. 总结