跳至主要內容

Flink

张启忻大约 14 分钟stream实时计算

Flink架构


Flink运行时有两种进程:JobManager、TaskManager,关系是1:N

。之后,收集进程报告。

JobManager/TaskManger可以多种方式启动: 直接机器上Standalone集群启动、容器中启动、资源框架管理(Yarn、K8S)启动

JobManager

  • Flink集群资源管理,主要是管理task slots,这是Flink的资源单位。可以管理各种资源提供者(Yarn、Kubernetes、standalone)提供的资源生成的task slots。
  • 提供一个REST接口,用来提交Flink应用程序、启动新的JobMaster。还包含一个WebUI展示Flink相关信息。
  • 负责管理单个JobGraph的执行。Flink作业都有各自的JobMaster。

TaskManager

。TaskManager资源调度的最小单位是task slot,task slot表示并发处理的task数量。

重要

一个task slot中是可以执行多个算子的

Tasks 和算子链

将算子的subtasks链接成tasks形成算子链。每个task由一条线程执行。链接成算子链的好处: 减少了线程切换的开销、减少延迟增加吞吐。算子链行为是可以配置的,参考算子链文档open in new window。下图中上边部分形成算子链只需要两条线程。下边部分原来的算子没有形成算子连,有5个subtask,需要5个并行线程。

Task Slots 和资源

每个 TaskManager 都是一个JVM进程,可以在单独的线程中执行一个或多个 subtask 。为了控制一个TaskManager接收多少个task,就有了task slots
每个 task slot 代表 TaskManager 中的一些资源,注意没有CPU资源隔离,往往指托管内存,多个 task slot 的内存隔离(目前为止仅分离托管内存)。
通过调整 task slot 的数量,用户可以定义 subtask 如何相互隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着意味着更多的 subtask 共享同一个 JVM 。

默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享的有两个主要优点:

  • Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同的并行度)。
  • 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window)一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。

Flink集群类型

  • Session 集群 先创建一个Flink集群,在提交作业时创建会话Session提交作业,这样Flink集群和作业的对应关系就是1:N,缺点是作业之间的隔离性较差,当某个作业失败集群异常时会影响到其他作业的运行,优点是提前启动集群,只是在Session中提交作业,可以节省启动多次集群的时间和资源
  • Job 集群 每次提交作业都创建一个新的Flink集群,适合大型长时间运行的作业部署,优点是资源隔离性好,缺点是每次启动新集群需要时间和资源
  • Application 集群 把作业打成jar包,客户端将jar文件提交到集群(dispatcher, JM, yarn)。集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。

有状态流处理

什么是状态?

算子有状态就意味着
官网定义:While many operations in a dataflow simply look at one individual event at a time(for example an event parser), some operations remember information across multiple events(for example window operators). These operations are called stateful.

KeyedState

KeyedState 是内部维护的 key/value 存储。状态操作给它分区、把它和 Stream 一起分发,因此想要访问它只能在 Keyed Stream 上,也就是 Stream 在根据 key 分区之后,而且只有与当前事件的 key 关联。校准对齐了 Stream 的 keystate 的 key 可以保证所有的 state 更新都是本地操作。这样就可以保证没有事务开销的一致性,这种对齐还允许 Flink 重分配 state 和调整流分区。

通过快照状态进行容错

由 Flink 管理的 ,每个 ,另外 。Flink 定期获取。比如:分布式文件系统、内存堆。如果遇到故障 Flink 可以快速还原快照,恢复系统运行。
Flink 管理的 state backend 分为两类:一类是基于 内嵌的 key/value 存储在磁盘上;一类是基于 Java 堆实现的 存储在内存中,这其中有份两种,一种是,一种是直接

State Backends

名称Working State状态备份快照说明
RocksDBStateBackend本地磁盘(tmp dir)分布式文件系统全量 / 增量支持大于内存大小的状态
经验法则:比基于堆的后端慢10倍
FsStateBackendJVM Heap分布式文件系统全量快速,需要大的堆内存
受限制于 GC
MemoryStateBackendJVM HeapJobManager JVM Heap全量适用于小状态(本地)的测试和实验

Checkpoint Storeage

checkpoint是状态快照,状态后端也就是状态的存储由上面描述的是有那么两类三种,那么checkpoint的存储也就是上面说的那三种。但是具体在编码层面也就是上面说的两类分别是下面表格:

名称状态备份说明
FileSystemCheckpointStorage分布式文件系统支持非常大的状态大小
高度可靠
推荐用于生产部署
JobManagerCheckpointStorageJobManager JVM Heap适合小状态(本地)的测试和实验

Checkpointing

重要

开始要 checkpoint 时,task manager 就开始执行具体的 checkpoint 操作,也就是做状态快照,checkpoint coordinator ,这些 barriers 流过 jobGraph 标注着 checkpoint 前后的流部分,比如 checkpoint n 将包含每个 operator 的 state,这些 state 是对应的 operator 消费了严格在 checkpoint barrier n 之前的所有事件,并且不包含在此(checkpoint barrier n)后的任何事件后而生成的状态。

  1. Barriers 数据流屏障,Flink分布式快照一个核心元素就是流屏障。它是记录着 Operator State 的信息,每个 barriers 都有一个编号,并且它们是严格按顺序流入数据流的。
  2. Snapshotting Operator State 算子状态快照,Operators 在收到快照屏障那一刻就开始对自己的 state 做 snapshot,屏障之前的所有状态都要更新完成并且屏障之后的状态还未开始,有可能这样的 state 比较大,所以最好存储到外部的文件系统上,比如 HDFS/S3 等等,完成了快照之后,再把快照屏障发送到自己的输出流。
  3. Recovery 恢复,一但出现失败,Flink 就从最近完成的 checkpoint 上恢复状态数据,系统会重新部署整个分布式数据流,并且给每一个 operator 恢复 state 数据。

Savepoints

savepoint 和 checkpoint 类似,它是通过 checkpoint 机制把数据保存到状态后端 state backends,可以恢复所有保存下来的 checkpoint。与 checkpoint 不同的是

Exactly Once 精准一次 语义

  • At most once 最多一次,Flink不作 checkpoint,不从 checkpoint 中恢复
  • At least once 最少一次,Flink从 checkpoint 中恢复,不作 barrier 对齐,可能存在冗余。或者不作 checkpoint,但是利用三方可以重复发送数据。
  • Exactly once 精准一次,Flink确保精确一次性语义是通过对齐 barrier,从 checkpoint 中恢复
:满足两点,一点是可重复发送,一点是消费端保持幂等性。

及时流处理

及时流处理是状态流处理的一种扩展,扩展的方向是时间在计算中扮演着重要的角色。

Event Time、Ingestion Time、Processing Time

  • Event Time(事件时间):事件产生的时间
  • Ingestion Time(摄取时间):Flink 读取事件的记录时间
  • Processing Time(处理时间):Flink pipeline 中具体算子处理事件的时间

Watermarks

在流式计算里,延迟正确性是两个相悖的概念,想要相对的正确性就必须牺牲一点延迟,而在 flink 中,watermarks 可以看作是相对延迟的定义。

Event Time 和 Watermarks 的关系

Windows

Flink 把无界数据流分解成有界数据流,然后做聚合分析。这里的有界数据流就是窗口的概念。
Flink 的窗口操作主要有两个: 将事件分配给窗口(根据需要创建新的窗口对象),以及 处理窗口内的数据。
Flink 的窗口 API 还具有 Triggers 和 Evictors 的概念,Triggers 确定何时调用窗口函数,而 Evictors 则可以删除在窗口中收集的元素。 举一个简单的例子,我们一般这样使用键控事件流(基于 key 分组的输入事件流):

stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce|aggregate|process(<window function>);

您不是必须使用键控事件流(keyed stream),但是值得注意的是,如果不使用键控事件流,我们的程序就不能 并行 处理。

stream
    .windowAll(<window assigner>)
    .reduce|aggregate|process(<window function>);

窗口的分类、窗口分配器

时间窗口(Time Window)

  • 滚动时间窗口 窗口之间不会有重叠也不会有空隙
  • 滑动时间窗口 窗口之间不会有空隙,会有重叠
  • 会话窗口 窗口之间不会有重叠,会有空隙

基于时间的窗口分配器(包括会话时间)既可以处理 事件时间,也可以处理 处理时间。这两种基于时间的处理没有哪一个更好,我们必须折中使用。比如处理时间限制:无法正确处理历史数据、无法正确处理超过最大无序边界的数据、结果将是不确定的,优势是延迟低。

数量窗口(Count Window)

只有窗口内的事件数量达到要求,这些窗口才会触发计算。当然也可以实现上述说的 Triggers 达到自定义的效果。

Flink API总共分为四层,Flink API的根基是有状态的及时流处理 Stateful Stream Processing。有状态是为了容错,及时/实时是为了流式分析、窗口处理等。

  • 最底层是 抽象实现是 Process Functionopen in new window,并且 Process Function 被集成到了 DataStream API 中。允许用户在程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。
  • 第二层是 实际上,许多应用程序不需要使用上面的有状态实时流处理 Process Function API ,而是使用 Core APIs 编程,包括 DataStream APIopen in new window(应用于有界/无界数据流场景)。例如各种形式的操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
    Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。 DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。
  • 第三层是 Table API 是例如在流式数据场景下,它可以表示一张正在动态改变的表。Table APIopen in new window 遵循(扩展)关系模型:即表拥有 schema(类似于关系数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。
  • 最顶层是 这层抽象在语义和程序表达式上都类似于 Table API,但是程序实现都是 SQL 查询表达式。SQLopen in new window 抽象

Flink处理大批量数据遇到去重问题解决办法

  1. 去重问题可以使用Set集合去重,但是大批量的数据会造成内存过大,达到一定程度后运行的内存已经不够数据的存储
  2. 利用外部组件进行去重,比如HBase,通过主键查询HBase来判断数据是否重复,但是也会存在一个问题,比如flink做一次checkpoint到出现问题这段时间内 其实已经出了一些数据,这样在HBase上就会记录这些数据的RowKey,flink程序恢复到checkpoint后继续处理,那么先查HBase发现已有RowKey就会忽视处理,这样 就造成了数据丢失的情况。
  3. 最好的办法是使用flink自身的 State Processor API,把处理数据的主键信息维护在Flink的State中,然后利用flink的checkpoint做异常恢复。 由于是去重场景,Flink三种后端状态存储选用 RocksDBStateBackend,将 TaskManager 状态信息记录到本地RocksDB种,之前程序处理的状态数据就保存在此, 状态类型这里使用 ValueState,open的时候从这个集合中get主键如果是null表示未处理, 反之就是处理过的。

Flink的背压机制