Flink CDC
Fink CDC 是什么
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract Transform Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构表更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:
- ✅ 端到端的数据集成框架
- ✅ 为数据集成的用户提供了易于构建作业的 API
- ✅ 支持在 Source 和 Sink 中处理多个表
- ✅ 整库同步
- ✅ 具备表结构变更自动同步的能力(Schema Evolution)
如何使用 Flink CDC
Flink CDC 提供了基于 YAML 格式的用户 API,更适合数据集成场景。以下是一个 YAML 文件的示例,它定义了一个数据管道(Pipeline),该 Pipeline 从 MySQL 捕获实时变更,并将它们同步到 Apache Doris:
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
通过使用 flink-cdc.sh 提交 YAML 文件,一个 Flink 作业将会被编译并部署到指定的 Flink 集群。请参考核心概念以获取 Pipeline 支持的所有功能的完整文档说明。
核心概念
Data Pipeline
在 Flink CDC 中的事件以管道的方式从上游流到下游,整个 ETL 任务被称之为数据管道(Data Pipeline)。
参数 ,所以描述 Pipeline,需要有这几部分:
- 必须的
- source
- sink
- pipeline
- 可选的
- route
- transform
Data Source
Data Source 用于访问元数据,并从外部系统读取已更改的数据。Data Source 可以同时从多个表中读取数据。
参数
参数 | 含义 | optional/required |
---|---|---|
type | 数据源类型,比如 mysql | required |
name | 数据源名称 | optional |
configurations | 数据源相关配置: 连接配置、表属性 | optional |
Data Sink
Data Sink 用于请求 schema 变更和更改写入外部系统的数据,一个 Data Sink 可以同时写入多个表。
参数
参数 | 含义 | optional/required |
---|---|---|
type | 接收器类型,比如 doris、starrocks | required |
name | 接收器名称 | optional |
configurations | 接收器相关配置: 连接配置、表属性 | optional |
Table ID
当连接到外部系统时,需要和外部系统的存储对象建立映射关系。Table Id 代表这个映射关系。
Table Id 是三个元素组合:(namespace、schemaName、tableName),不同的外部如下:
data system | parts in tableId | String example |
---|---|---|
Oracle/PostgreSQL | database, schema, table | mydb.default.orders |
MySQL/Doris/StarRocks | database, table | mydb.orders |
Kafka | topic | orders |
Transform
Transform: 可以帮助用户删除、展开数据列,还可以过滤一些不必要的数据。
Route
Route: 源表和接收表的映射规则,最经典的是合并子数据库和子表,将多个上游源表路由到一张接收表。
参数
参数 | 含义 | optional/required |
---|---|---|
source-table | 源表,支持表达式 | required |
sink-table | 接收表,支持表达式 | required |
description | 路由规则描述 | optional |
连接器
Flink CDC 提供了多个 Source 和 Sink 连接器来与外部系统交互。可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在您的 YAML Pipeline 定义中指定所需的连接器。
支持的连接器
Connector | Supported Type | External System |
---|---|---|
Apache Doris | Sink | Apache Doris: 1.2.x, 2.x.x |
MySQL | Source | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 |
StarRocks | Sink | StarRocks: 2.x, 3.x |
MySQL (Source)
MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
Doris (Sink)
source:
type: values
name: ValuesSource
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1
pipeline:
parallelism: 1
StarRocks (Sink)
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to StarRocks Pipeline
parallelism: 2
提交 Pipeline 到 Flink 集群
standalone
Standalone 模式是最简的发布模式。
- 首先在本地或者集群上安装 Flink,启动
cd /path/flink-*
、./bin/start-cluster.sh
关闭./bin/stop-cluster.sh
- 设置 Flink CDC
- 下载解压 tar 文件,
tar -xzf flink-cdc-*.tar.gz
共有4个目录 bin、lib、log、conf - 下载连接器(connector)解压到上面的 lib 目录
- 下载解压 tar 文件,
- 提交 Flink CDC 作业任务:就是提交一个 yaml 文件
cd /path/flink-cdc-*
./bin/flink-cdc.sh mysql-to-doris.yaml
Kubernetes
同上
YARN
同上