跳至主要內容

Flink CDC

张启忻大约 4 分钟FlinkCDC

Fink CDC 是什么

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract Transform Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构表更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

  • ✅ 端到端的数据集成框架
  • ✅ 为数据集成的用户提供了易于构建作业的 API
  • ✅ 支持在 Source 和 Sink 中处理多个表
  • ✅ 整库同步
  • ✅ 具备表结构变更自动同步的能力(Schema Evolution)

Flink CDC 提供了基于 YAML 格式的用户 API,更适合数据集成场景。以下是一个 YAML 文件的示例,它定义了一个数据管道(Pipeline),该 Pipeline 从 MySQL 捕获实时变更,并将它们同步到 Apache Doris:

source:
  type: mysql
  hostname: localhost
  port: 3306
  username: root
  password: 123456
  tables: app_db.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: doris
  fenodes: 127.0.0.1:8030
  username: root
  password: ""
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

pipeline:
  name: Sync MySQL Database to Doris
  parallelism: 2

通过使用 flink-cdc.sh 提交 YAML 文件,一个 Flink 作业将会被编译并部署到指定的 Flink 集群。请参考核心概念以获取 Pipeline 支持的所有功能的完整文档说明。

核心概念

Data Pipeline

在 Flink CDC 中的事件以管道的方式从上游流到下游,整个 ETL 任务被称之为数据管道(Data Pipeline)。
参数 ,所以描述 Pipeline,需要有这几部分:

  • 必须的
    • source
    • sink
    • pipeline
  • 可选的
    • route
    • transform

Data Source

Data Source 用于访问元数据,并从外部系统读取已更改的数据。Data Source 可以同时从多个表中读取数据。
参数

参数含义optional/required
type数据源类型,比如 mysqlrequired
name数据源名称optional
configurations数据源相关配置: 连接配置、表属性optional

Data Sink

Data Sink 用于请求 schema 变更和更改写入外部系统的数据,一个 Data Sink 可以同时写入多个表。
参数

参数含义optional/required
type接收器类型,比如 doris、starrocksrequired
name接收器名称optional
configurations接收器相关配置: 连接配置、表属性optional

Table ID

当连接到外部系统时,需要和外部系统的存储对象建立映射关系。Table Id 代表这个映射关系。
Table Id 是三个元素组合:(namespace、schemaName、tableName),不同的外部如下:

data systemparts in tableIdString example
Oracle/PostgreSQLdatabase, schema, tablemydb.default.orders
MySQL/Doris/StarRocksdatabase, tablemydb.orders
Kafkatopicorders

Transform

Transform: 可以帮助用户删除、展开数据列,还可以过滤一些不必要的数据。

Route

Route: 源表和接收表的映射规则,最经典的是合并子数据库和子表,将多个上游源表路由到一张接收表。
参数

参数含义optional/required
source-table源表,支持表达式required
sink-table接收表,支持表达式required
description路由规则描述optional

连接器

Flink CDC 提供了多个 Source 和 Sink 连接器来与外部系统交互。可以直接使用这些连接器,只需将 JAR 文件添加到您的 Flink CDC 环境中,并在您的 YAML Pipeline 定义中指定所需的连接器。
支持的连接器

ConnectorSupported TypeExternal System
Apache DorisSinkApache Doris: 1.2.x, 2.x.x
MySQLSourceMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
StarRocksSinkStarRocks: 2.x, 3.x

MySQL (Source)

MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

Doris (Sink)

source:
   type: values
   name: ValuesSource

sink:
   type: doris
   name: Doris Sink
   fenodes: 127.0.0.1:8030
   username: root
   password: ""
   table.create.properties.replication_num: 1

pipeline:
   parallelism: 1

StarRocks (Sink)

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to StarRocks Pipeline
   parallelism: 2

standalone

Standalone 模式是最简的发布模式。

  • 首先在本地或者集群上安装 Flink,启动 cd /path/flink-*./bin/start-cluster.sh 关闭./bin/stop-cluster.sh
  • 设置 Flink CDC
    • 下载解压 tar 文件,tar -xzf flink-cdc-*.tar.gz 共有4个目录 bin、lib、log、conf
    • 下载连接器(connector)解压到上面的 lib 目录
  • 提交 Flink CDC 作业任务:就是提交一个 yaml 文件 cd /path/flink-cdc-* ./bin/flink-cdc.sh mysql-to-doris.yaml

Kubernetes

同上

YARN

同上