阿里云Flink社区贡献Flink从入门到精通.pdf
http://www.100md.com
2020年11月4日
![]() |
| 第1页 |
![]() |
| 第4页 |
![]() |
| 第20页 |
![]() |
| 第22页 |
![]() |
| 第47页 |
![]() |
| 第97页 |
参见附件(18526KB,204页)。
Flink是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运行在Yarn或者K8S这种资源管理系统上面,也可以在各种云环境中执行。

综述
本文主要介绍Flink Runtime的作业执行的核心机制。首先介绍Flink Runtime的整体架构以及Job的基本执行流程,然后介绍在这个过程,Flink是怎么进行资源管理、作业调度以及错误恢复的。最后,本文还将简要介绍Flink Runtime层当前正在进行的一些工作。
Flink Runtime整体架构
Flink的整体架构。Flink是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运行在Yarn或者K8S这种资源管理系统上面,也可以在各种云环境中执行。
错误恢复
在Flink作业的执行过程中,除正常执行的流程外,还有可能由于环境等原因导致各种类型的错误。整体上来说,错误可能分为两大类:Task执行出现错误或Flink集群的Master出现错误。由于错误不可避免,为了提高可用性,Flink需要提供自动错误恢复机制来进行重试。
对于第一类Task执行错误,Flink提供了多种不同的错误恢复策略。如图8所示,第一种策略是Restart-all,即直接重启所有的Task。对于Flink的流任务,由于Flink提供了Checkpoint机制,因此当任务重启后可以直接从上次的Checkpoint开始继续执行。因此这种方式更适合于流作业。第二类错误恢复策略是Restart-individual,它只适用于Task之间没有数据传输的情况。这种情况下,我们可以直接重启出错的任务。
阿里云Flink社区贡献Flink从入门到精通截图




开发者社区 阿里云实时计算
实时计算交流钉钉群 Flink社区微信公众号
扫一扫二维码图案,关注我吧Apache Flink 进阶(一):Runtime 核心机制剖析 4
Apache Flink 进阶(二):时间属性深度解析 18
Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 30
Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 41
Apache Flink 进阶(五):数据类型和序列化 60
Apache Flink 进阶(六):Flink 作业执行深度解析 71
Apache Flink 进阶(七):网络流控及反压剖析 88
Apache Flink 进阶(八):详解 Metrics 原理与实战 112
Apache Flink 进阶(九):Flink Connector 开发 125
Apache Flink 进阶(十):Flink State 最佳实践 141
Apache Flink 进阶(十一):TensorFlow On Flink 149
Apache Flink 进阶(十二):深度探索 Flink SQL 159
Apache Flink 进阶(十三):Python API 应用实践 181
目录Apache Flink 进阶(一):Runtime 核心机制剖析
作者:高赟(云骞)
阿里巴巴技术专家
简介:Flink 的整体架构如图 1 所示。Flink 是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运
行在 Yarn 或者 K8S 这种资源管理系统上面,也可以在各种云环境中执行。
1. 综述
本文主要介绍 Flink Runtime 的作业执行的核心机制。首先介绍 Flink Runtime
的整体架构以及 Job 的基本执行流程,然后介绍在这个过程,Flink 是怎么进行资源
管理、作业调度以及错误恢复的。最后,本文还将简要介绍 Flink Runtime 层当前正
在进行的一些工作。
2. Flink Runtime 整体架构
Flink 的整体架构如图 1 所示。Flink 是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运行在
Yarn 或者 K8S 这种资源管理系统上面,也可以在各种云环境中执行。Apache Flink 进阶(一):Runtime 核心机制剖析 < 5
图1 Flink 的整体架构,其中 Runtime 层对不同的执行环境提供了一套统一的分布式执行引擎
针对不同的执行环境,Flink 提供了一套统一的分布式作业执行引擎,也就是
Flink Runtime 这层。Flink 在 Runtime 层之上提供了 DataStream 和 DataSet 两
套 API,分别用来编写流作业与批作业,以及一组更高级的 API 来简化特定作业的
编写。本文主要介绍 Flink Runtime 层的整体架构。
Flink Runtime 层的主要架构如图 2 所示,它展示了一个 Flink 集群的基本结
构。Flink Runtime 层的整个架构主要是在 FLIP-6 中实现的,整体来说,它采用了
标准 master-slave 的结构,其中左侧白色圈中的部分即是 master,它负责管理整
个集群中的资源和作业;而右侧的两个 TaskExecutor 则是 Slave,负责提供具体的
资源并实际执行作业。6 > Apache Flink 进阶(一):Runtime 核心机制剖析
图2 Flink 集群的基本结构。Flink Runtime 层采用了标准的 master-slave 架构
其中,Master 部分又包含了三个组件,即 Dispatcher、ResourceManager
和 JobManager。其中,Dispatcher 负责接收用户提供的作业,并且负责为这个
新提交的作业拉起一个新的 JobManager 组件。ResourceManager 负责资源的
管理,在整个 Flink 集群中只有一个 ResourceManager。JobManager 负责管理
作业的执行,在一个 Flink 集群中可能有多个作业同时执行,每个作业都有自己的
JobManager 组件。这三个组件都包含在 AppMaster 进程中。
基于上述结构,当用户提交作业的时候,提交脚本会首先启动一个 Client进程
负责作业的编译与提交。它首先将用户编写的代码编译为一个 JobGraph,在这个过
程,它还会进行一些检查或优化等工作,例如判断哪些 Operator 可以 Chain 到同一
个 Task 中。然后,Client 将产生的 JobGraph 提交到集群中执行。此时有两种情
况,一种是类似于 Standalone 这种 Session 模式,AM 会预先启动,此时 Client
直接与 Dispatcher 建立连接并提交作业即可。另一种是 Per-Job 模式,AM 不会
预先启动,此时 Client 将首先向资源管理系统 (如Yarn、K8S)申请资源来启动
AM,然后再向 AM 中的 Dispatcher 提交作业。
当作业到 Dispatcher 后,Dispatcher 会首先启动一个 JobManager 组件,然
后 JobManager 会向 ResourceManager 申请资源来启动作业中具体的任务。这Apache Flink 进阶(一):Runtime 核心机制剖析 < 7
时根据 Session 和 Per-Job 模式的区别, TaskExecutor 可能已经启动或者尚未启
动。如果是前者,此时 ResourceManager 中已有记录了 TaskExecutor 注册的资
源,可以直接选取空闲资源进行分配。否则,ResourceManager 也需要首先向外
部资源管理系统申请资源来启动 TaskExecutor,然后等待 TaskExecutor 注册相
应资源后再继续选择空闲资源进程分配。目前 Flink 中 TaskExecutor 的资源是通过
Slot 来描述的,一个 Slot 一般可以执行一个具体的 Task,但在一些情况下也可以执
行多个相关联的 Task,这部分内容将在下文进行详述。ResourceManager 选择到
空闲的 Slot 之后,就会通知相应的 TM “将该 Slot 分配分 JobManager XX ”,然
后 TaskExecutor 进行相应的记录后,会向 JobManager 进行注册。JobManager
收到 TaskExecutor 注册上来的 Slot 后,就可以实际提交 Task 了。
TaskExecutor 收到 JobManager 提交的 Task 之后,会启动一个新的线程来
执行该 Task。Task 启动后就会开始进行预先指定的计算,并通过数据 Shuffle 模块
互相交换数据。
以上就是 Flink Runtime 层执行作业的基本流程。可以看出,Flink 支持两种不
同的模式,即 Per-job 模式与 Session 模式。如图 3 所示,Per-job 模式下整个
Flink 集群只执行单个作业,即每个作业会独享 Dispatcher 和 ResourceManager
组件。此外,Per-job 模式下 AppMaster 和 TaskExecutor 都是按需申请的。因
此,Per-job 模式更适合运行执行时间较长的大作业,这些作业对稳定性要求较
高,并且对申请资源的时间不敏感。与之对应,在 Session 模式下,Flink 预先启动
AppMaster 以及一组 TaskExecutor,然后在整个集群的生命周期中会执行多个作
业。可以看出,Session 模式更适合规模小,执行时间短的作业。8 > Apache Flink 进阶(一):Runtime 核心机制剖析
图3 Flink Runtime 支持两种作业执行的模式
3. 资源管理与作业调度
本节对 Flink 中资源管理与作业调度的功能进行更深入的说明。实际上,作业调
度可以看做是对资源和任务进行匹配的过程。如上节所述,在 Flink 中,资源是通过
Slot 来表示的,每个 Slot 可以用来执行不同的 Task。而在另一端,任务即 Job 中
实际的 Task,它包含了待执行的用户逻辑。调度的主要目的就是为了给 Task 找到
匹配的 Slot。逻辑上来说,每个 Slot 都应该有一个向量来描述它所能提供的各种资
源的量,每个 Task 也需要相应的说明它所需要的各种资源的量。但是实际上在 1.9
之前,Flink 是不支持细粒度的资源描述的,而是统一的认为每个 Slot 提供的资源和
Task 需要的资源都是相同的。从 1.9 开始,Flink 开始增加对细粒度的资源匹配的支
持的实现,但这部分功能目前仍在完善中。
作业调度的基础是首先提供对资源的管理,因此我们首先来看下 Flink 中资源
管理的实现。如上文所述,Flink 中的资源是由 TaskExecutor 上的 Slot 来表示
的。如图 4 所示,在 ResourceManager 中,有一个子组件叫做 SlotManager,它
维护了当前集群中所有 TaskExecutor 上的 Slot 的信息与状态,如该 Slot 在哪个
TaskExecutor 中,该 Slot 当前是否空闲等。当 JobManger 来为特定 Task 申请
资源的时候,根据当前是 Per-job 还是 Session 模式,ResourceManager 可能
会去申请资源来启动新的 TaskExecutor。当 TaskExecutor 启动之后,它会通过
服务发现找到当前活跃的 ResourceManager 并进行注册。在注册信息中,会包含Apache Flink 进阶(一):Runtime 核心机制剖析 < 9
该 TaskExecutor中所有 Slot 的信息。 ResourceManager 收到注册信息后,其中
的 SlotManager 就会记录下相应的 Slot 信息。当 JobManager 为某个 Task 来申
请资源时, SlotManager 就会从当前空闲的 Slot 中按一定规则选择一个空闲的 Slot
进行分配。当分配完成后,如第 2 节所述,RM 会首先向 TaskManager 发送 RPC
要求将选定的 Slot 分配给特定的 JobManager。TaskManager 如果还没有执行过
该 JobManager 的 Task 的话,它需要首先向相应的 JobManager 建立连接,然
后发送提供 Slot 的 RPC 请求。在 JobManager 中,所有 Task 的请求会缓存到
SlotPool 中。当有 Slot 被提供之后,SlotPool 会从缓存的请求中选择相应的请求并
结束相应的请求过程。
图4 Flink 中资源管理功能各模块交互关系
当 Task 结束之后,无论是正常结束还是异常结束,都会通知 JobManager 相
应的结束状态,然后在 TaskManager 端将 Slot 标记为已占用但未执行任务的状
态。JobManager 会首先将相应的 Slot 缓存到 SlotPool 中,但不会立即释放。这
种方式避免了如果将 Slot 直接还给 ResourceManager,在任务异常结束之后需10 > Apache Flink 进阶(一):Runtime 核心机制剖析
要重启时,需要立刻重新申请 Slot 的问题。通过延时释放,Failover 的 Task 可以
尽快调度回原来的 TaskManager,从而加快 Failover 的速度。当 SlotPool 中缓
存的 Slot 超过指定的时间仍未使用时,SlotPool 就会发起释放该 Slot 的过程。与
申请 Slot 的过程对应,SlotPool 会首先通知 TaskManager 来释放该 Slot,然后
TaskExecutor 通知 ResourceManager 该 Slot 已经被释放,从而最终完成释放的
逻辑。
除了正常的通信逻辑外,在 ResourceManager 和 TaskExecutor 之间还存在
定时的心跳消息来同步 Slot 的状态。在分布式系统中,消息的丢失、错乱不可避免,这些问题会在分布式系统的组件中引入不一致状态,如果没有定时消息,那么组件无
法从这些不一致状态中恢复。此外,当组件之间长时间未收到对方的心跳时,就会认
为对应的组件已经失效,并进入到 Failover 的流程。
在 Slot 管理基础上,Flink 可以将 Task 调度到相应的 Slot 当中。如上文所
述,Flink 尚未完全引入细粒度的资源匹配,默认情况下,每个 Slot 可以分配给一个
Task。但是,这种方式在某些情况下会导致资源利用率不高。如图 5 所示,假如 A、B、C 依次执行计算逻辑,那么给 A、B、C 分配分配单独的 Slot 就会导致资源利用
率不高。为了解决这一问题,Flink 提供了 Share Slot 的机制。如图 5 所示,基于
Share Slot,每个 Slot 中可以部署来自不同 JobVertex 的多个任务,但是不能部署
来自同一个 JobVertex 的 Task。如图 5所示,每个 Slot 中最多可以部署同一个 A、B 或 C 的 Task,但是可以同时部署 A、B 和 C 的各一个 Task。当单个 Task 占用
资源较少时,Share Slot 可以提高资源利用率。 此外,Share Slot 也提供了一种简
单的保持负载均衡的方式。Apache Flink 进阶(一):Runtime 核心机制剖析 < 11
图5 Flink Share Slot 示例。使用 Share Slot 可以在每个 Slot 中部署来自不同 JobVertex 的多个 Task
基于上述 Slot 管理和分配的逻辑,JobManager 负责维护作业中 Task 执
行的状态。如上文所述,Client 端会向 JobManager 提交一个 JobGraph,它
代表了作业的逻辑结构。JobManager 会根据 JobGraph 按并发展开,从而得到
JobManager 中关键的 ExecutionGraph。ExecutionGraph 的结构如图 5 所示,与 JobGraph 相比,ExecutionGraph 中对于每个 Task 与中间结果等均创建了对
应的对象,从而可以维护这些实体的信息与状态。12 > Apache Flink 进阶(一):Runtime 核心机制剖析
图6 Flink 中的 JobGraph 与 ExecutionGraph。ExecutionGraph 是 JobGraph 按并发展开所形
成的,它是 JobMaster 中的核心数据结构
在一个 Flink Job 中是包含多个 Task 的,因此另一个关键的问题是在 Flink 中
按什么顺序来调度 Task。如图 7 所示,目前 Flink 提供了两种基本的调度逻辑,即
Eager 调度与 Lazy From Source。Eager 调度如其名子所示,它会在作业启动时
申请资源将所有的 Task 调度起来。这种调度算法主要用来调度可能没有终止的流作
业。与之对应,Lazy From Source 则是从 Source 开始,按拓扑顺序来进行调度。
简单来说,Lazy From Source 会先调度没有上游任务的 Source 任务,当这些任
务执行完成时,它会将输出数据缓存到内存或者写入到磁盘中。然后,对于后续的任
务,当它的前驱任务全部执行完成后,Flink 就会将这些任务调度起来。这些任务会
从读取上游缓存的输出数据进行自己的计算。这一过程继续进行直到所有的任务完成
计算。Apache Flink 进阶(一):Runtime 核心机制剖析 < 13
图7 Flink 中两种基本的调度策略。其中 Eager 调度适用于流作业,而 Lazy From Source 适用
于批作业
4. 错误恢复
在 Flink 作业的执行过程中,除正常执行的流程外,还有可能由于环境等原因导
致各种类型的错误。整体上来说,错误可能分为两大类:Task 执行出现错误或 Flink
集群的 Master 出现错误。由于错误不可避免,为了提高可用性,Flink 需要提供自
动错误恢复机制来进行重试。
对于第一类 Task 执行错误,Flink 提供了多种不同的错误恢复策略。如图 8
所示,第一种策略是 Restart-all,即直接重启所有的 Task。对于 Flink 的流任
务,由于 Flink 提供了 Checkpoint 机制,因此当任务重启后可以直接从上次的
Checkpoint 开始继续执行。因此这种方式更适合于流作业。第二类错误恢复策略是
Restart-individual,它只适用于 Task 之间没有数据传输的情况。这种情况下,我
们可以直接重启出错的任务。14 > Apache Flink 进阶(一):Runtime 核心机制剖析
图8 Restart-all 错误恢复策略示例。该策略会直接重启所有的 Task
图9 Restart-individual 错误恢复策略示例。该策略只适用于 Task之间不需要数据传输的作业,对于这种作业可以只重启出现错误的 TaskApache Flink 进阶(一):Runtime 核心机制剖析 < 15
由于 Flink 的批作业没有 Checkpoint 机制,因此对于需要数据传输的作业,直接重启所有 Task 会导致作业从头计算,从而导致一定的性能问题。为了增强对
Batch 作业,Flink 在 1.9 中引入了一种新的 Region-Based 的 Failover 策略。在
一个 Flink 的 Batch 作业中 Task 之间存在两种数据传输方式,一种是 Pipeline 类
型的方式,这种方式上下游 Task 之间直接通过网络传输数据,因此需要上下游同
时运行;另外一种是 Blocking 类型的试,如上节所述,这种方式下,上游的 Task
会首先将数据进行缓存,因此上下游的 Task 可以单独执行。基于这两种类型的传
输,Flink 将 ExecutionGraph 中使用 Pipeline 方式传输数据的 Task 的子图叫做
Region,从而将整个 ExecutionGraph 划分为多个子图。可以看出,Region 内的
Task 必须同时重启,而不同 Region 的 Task 由于在 Region 边界存在 Blocking 的
边,因此,可以单独重启下游 Region 中的 Task。
基于这一思路,如果某个 Region 中的某个 Task 执行出现错误,可以分两种
情况进行考虑。如图 8 所示,如果是由于 Task 本身的问题发生错误,那么可以只
重启该 Task 所属的 Region 中的 Task,这些 Task 重启之后,可以直接拉取上游
Region 缓存的输出结果继续进行计算。
另一方面,如图如果错误是由于读取上游结果出现问题,如网络连接中断、缓存
上游输出数据的 TaskExecutor 异常退出等,那么还需要重启上游 Region 来重新
产生相应的数据。在这种情况下,如果上游 Region 输出的数据分发方式不是确定性
的(如 KeyBy、Broadcast 是确定性的分发方式,而 Rebalance、Random 则不
是,因为每次执行会产生不同的分发结果),为保证结果正确性,还需要同时重启上
游 Region 所有的下游 Region。16 > Apache Flink 进阶(一):Runtime 核心机制剖析
图 10 Region-based 错误恢复策略示例一。如果是由于下游任务本身导致的错误,可以只重启下
游对应的 Region
图 11 Region-based 错误恢复策略示例二。如果是由于上游失败导致的错误,那么需要同时重启
上游的 Region 和下游的 Region。实际上,如果下游的输出使用了非确定的数据分割方式,为了保持数据一致性,还需要同时重启所有上游 Region 的下游 Region
除了 Task 本身执行的异常外,另一类异常是 Flink 集群的 Master 进行发生异
常。目前 Flink 支持启动多个 Master 作为备份,这些 Master 可以通过 ZK 来进行
选主,从而保证某一时刻只有一个 Master 在运行。当前活路的 Master 发生异常
时,某个备份的 Master 可以接管协调的工作。为了保证 Master 可以准确维护作业
的状态,Flink 目前采用了一种最简单的实现方式,即直接重启整个作业。实际上,Apache Flink 进阶(一):Runtime 核心机制剖析 < 17
由于作业本身可能仍在正常运行,因此这种方式存在一定的改进空间。
5. 未来展望
Flink目前仍然在Runtime部分进行不断的迭代和更新。目前来看,Flink未来
可能会在以下几个方式继续进行优化和扩展:
● 更完善的资源管理:从 1.9 开始 Flink 开始了对细粒度资源匹配的支持。基于
细粒度的资源匹配,用户可以为 TaskExecutor 和 Task 设置实际提供和使用
的 CPU、内存等资源的数量,Flink 可以按照资源的使用情况进行调度。这一
机制允许用户更大范围的控制作业的调度,从而为进一步提高资源利用率提供
了基础。
● 统一的 Stream 与 Batch:Flink 目前为流和批分别提供了 DataStream 和
DataSet 两套接口,在一些场景下会导致重复实现逻辑的问题。未来 Flink 会
将流和批的接口都统一到 DataStream 之上。
● 更灵活的调度策略:Flink 从 1.9 开始引入调度插件的支持,从而允许用户来
扩展实现自己的调度逻辑。未来 Flink 也会提供更高性能的调度策略的实现。
● Master Failover 的优化:如上节所述,目前 Flink 在 Master Failover 时
需要重启整个作业,而实际上重启作业并不是必须的逻辑。Flink 未来会对
Master failover 进行进一步的优化来避免不必要的作业重启。Apache Flink 进阶(二):时间属性深度解析
作者:崔星灿,Apache Flink Committer
整理:沙晟阳(成阳),阿里巴巴技术专家
本文根据 Apache Flink 进阶篇系列直播课程整理而成,由 Apache Flink
Committer 崔星灿分享,阿里巴巴技术专家沙晟阳(成阳)整理。文章将对 Flink 的
时间属性及原理进行深度解析。
Tips:文末可回顾全部基础篇及进阶篇系列教程。
1. 前言
Flink 的 API 大体上可以划分为三个层次:处于最底层的 ProcessFunction、中
间一层的 DataStream API 和最上层的 SQLTable API,这三层中的每一层都非常
依赖于时间属性。时间属性是流处理中最重要的一个方面,是流处理系统的基石之
一,贯穿这三层 API。在 DataStream API 这一层中因为封装方面的原因,我们能
够接触到时间的地方不是很多,所以我们将重点放在底层的 ProcessFunction 和最
上层的 SQLTable API。Apache Flink 进阶(二):时间属性深度解析 < 19
2. Flink 时间语义
在不同的应用场景中时间语义是各不相同的,Flink 作为一个先进的分布式流处
理引擎,它本身支持不同的时间语义。其核心是 Processing Time 和 Event Time
(Row Time),这两类时间主要的不同点如下表所示:
Processing Time 是来模拟我们真实世界的时间,其实就算是处理数据的节点
本地时间也不一定是完完全全的真实世界的时间,所以说它是用来模拟真实世界的时
间。而 Event Time 是数据世界的时间,即我们要处理的数据流世界里的时间。关
于他们的获取方式,Process Time 是通过直接去调用本地机器的时间,而 Event
Time 则是根据每一条处理记录所携带的时间戳来判定。
这两种时间在 Flink 内部的处理以及用户的实际使用方面,难易程度都是不同
的。相对而言的 Processing Time 处理起来更加的简单,而 Event Time 要更麻
烦一些。而在使用 Processing Time 的时候,我们得到的处理结果(或者说流处理
应用的内部状态)是不确定的。而因为在 Flink 内部对 Event Time 做了各种保障,使用 Event Time 的情况下,无论重放数据多少次,都能得到一个相对确定可重现
的结果。
因此在判断应该使用 Processing Time 还是 Event Time 的时候,可以遵循
一个原则:当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行20 > Apache Flink 进阶(二):时间属性深度解析
重放,是不是希望结果完全相同。如果希望结果完全相同,就只能用 Event Time;
如果接受结果不同,则可以用 Processing Time。Processing Time 的一个常见的
用途是,根据现实时间来统计整个系统的吞吐,比如要计算现实时间一个小时处理了
多少条数据,这种情况只能使用 Processing Time。
2.1 时间的特性
时间的一个重要特性是:时间只能递增,不会来回穿越。 在使用时间的时
候我们要充分利用这个特性。假设我们有这么一些记录,然后我们来分别看一下
Processing Time 还有 Event Time 对于时间的处理。
● 对于 Processing Time,因为我们是使用的是本地节点的时间(假设这个节
点的时钟同步没有问题),我们每一次取到的 Processing Time 肯定都是递增
的,递增就代表着有序,所以说我们相当于拿到的是一个有序的数据流。
● 而在用 Event Time 的时候因为时间是绑定在每一条的记录上的,由于网络延
迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在
一定程度的乱序,比如上图的例子。在 Event Time 场景下,我们把每一个记
录所包含的时间称作 Record Timestamp。如果 Record Timestamp 所得到Apache Flink 进阶(二):时间属性深度解析 < 21
的时间序列存在乱序,我们就需要去处理这种情况。
如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化。简
单地讲,就是把数据按照一定的条数组成一些小批次,但这里的小批次并不是攒够多
少条就要去处理,而是为了对他们进行时间上的划分。经过这种更高层次的离散化之
后,我们会发现最右边方框里的时间就是一定会小于中间方框里的时间,中间框里的
时间也一定会小于最左边方框里的时间。
这个时候我们在整个时间序列里插入一些类似于标志位的特殊的处理数据,这些特殊的处理数据叫做 watermark。一个 watermark 本质上就代表了这个
watermark 所包含的 timestamp 数值,表示以后到来的数据已经再也没有小于或
等于这个时间的了。22 > Apache Flink 进阶(二):时间属性深度解析
3. Timestamp 和 Watermark 行为概览
接下来我们重点看一下 Event Time 里的 Record Timestamp(简写成 time-
stamp)和 watermark 的一些基本信息。绝大多数的分布式流计算引擎对于数据都是
进行了 DAG 图的抽象,它有自己的数据源,有处理算子,还有一些数据汇。数据在
不同的逻辑算子之间进行流动。watermark 和 timestamp 有自己的生命周期,接下
来我会从 watermark 和 timestamp 的产生、他们在不同的节点之间的传播、以及在
每一个节点上的处理,这三个方面来展开介绍。
3.1 Timestamp 分配和 Watermark 生成
Flink 支持两种 watermark 生成方式。第一种是在 SourceFunction 中产生,相当于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头。
我们可以在 SourceFunction 里面通过这两个方法产生 watermark:
● 通过 collectWithTimestamp 方法发送一条数据,其中第一个参数就是我
们要发送的数据,第二个参数就是这个数据所对应的时间戳;也可以调用
emitWatermark 去产生一条 watermark,表示接下来不会再有时间戳小于等
于这个数值记录。
● 另 外, 有 时 候 我 们 不 想 在 SourceFunction 里 生 成 timestamp 或 者 Apache Flink 进阶(二):时间属性深度解析 < 23
watermark,或者说使用的 SourceFunction 本身不支持,我们还可以
在 使 用 DataStream API 的 时 候 指 定, 调 用 的 DataStream.assign-
TimestampsAndWatermarks 这个方法,能够接收不同的 timestamp 和
watermark 的生成器。
总体上而言生成器可以分为两类:第一类是定期生成器;第二类是根据一些在流
处理数据流中遇到的一些特殊记录生成的。
两者的区别主要有三个方面,首先定期生成是现实时间驱动的,这里的“定期
生成”主要是指 watermark(因为 timestamp 是每一条数据都需要有的),即定期
会调用生成逻辑去产生一个 watermark。而根据特殊记录生成是数据驱动的,即是
否生成 watermark 不是由现实时间来决定,而是当看到一些特殊的记录就表示接下
来可能不会有符合条件的数据再发过来了,这个时候相当于每一次分配 Timestamp
之后都会调用用户实现的 watermark 生成方法,用户需要在生成方法中去实现
watermark 的生成逻辑。
大家要注意的是就是我们在分配 timestamp 和生成 watermark 的过程,虽然
在 SourceFunction 和 DataStream 中都可以指定,但是还是建议生成的工作越靠
近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据
是否乱序。Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被
正确地传递到下游的节点。24 > Apache Flink 进阶(二):时间属性深度解析
3.2 Watermark 传播
具体的传播策略基本上遵循这三点。
● 首先,watermark 会以广播的形式在算子之间进行传播。比如说上游的算子
连接了三个下游的任务,它会把自己当前的收到的 watermark 以广播的形式
传到下游。
● 第二,如果在程序里面收到了一个 Long.MAX_VALUE 这个数值的 water-
mark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就
是一个终止的标志。
● 第三,对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark 的计算就有讲究了,一个原则是:单输入取其大,多输入取小。
举个例子,假设这边蓝色的块代表一个算子的一个任务,然后它有三个输入,分
别是 W1、W2、W3,这三个输入可以理解成任何输入,这三个输入可能是属于同一
个流,也可能是属于不同的流。然后在计算 watermark 的时候,对于单个输入而言
是取他们的最大值,因为我们都知道 watermark 应该遵循一个单调递增的一个原则。
对于多输入,它要统计整个算子任务的 watermark 时,就会取这三个计算出来的
watermark 的最小值。即一个多个输入的任务,它的 watermark 受制于最慢的那条
输入流。Apache Flink 进阶(二):时间属性深度解析 < 25
这一点类似于木桶效应,整个木桶中装的水会受制于最矮的那块板。
watermark 在传播的时候有一个特点是,它的传播是幂等的。多次收到相同的
watermark,甚至收到之前的 watermark 都不会对最后的数值产生影响,因为对于
单个输入永远是取最大的,而对于整个任务永远是取一个最小的。
同时我们可以注意到这种设计其实有一个局限,具体体现在它没有区分你这个输
入是一条流多个 partition 还是来自于不同的逻辑上的流的 JOIN。对于同一个流的不
同 partition,我们对他做这种强制的时钟同步是没有问题的,因为一开始就把一条流
拆散成不同的部分,但每一个部分之间共享相同的时钟。
但是如果算子的任务是在做类似于 JOIN 操作,那么要求两个输入的时钟强制同
步其实没有什么道理的,因为完全有可能是把一条离现在时间很近的数据流和一个离
当前时间很远的数据流进行 JOIN,这个时候对于快的那条流,因为它要等慢的那条
流,所以说它可能就要在状态中去缓存非常多的数据,这对于整个集群来说是一个很
大的性能开销。
3.3 ProcessFunction
在正式介绍 watermark 的处理之前,先简单介绍 ProcessFunction,因为
watermark 在任务里的处理逻辑分为内部逻辑和外部逻辑。外部逻辑其实就是通过
ProcessFunction 来体现的,如果你需要使用 Flink 提供的时间相关的 API 的话就
只能写在 ProcessFunction 里。
ProcessFunction 和时间相关的功能主要有三点:
● 第一点,根据你当前系统使用的时间语义不同,你可以去获取当前你正在处理
这条记录的 Record Timestamp,或者当前的 Processing Time。
● 第二点,它可以获取当前算子的时间,可以把它理解成当前的 watermark。
● 第三点,为了在 ProcessFunction 中去实现一些相对复杂的功能,允许注
册一些 timer(定时器)。比如说在 watermark 达到某一个时间点的时候就触26 > Apache Flink 进阶(二):时间属性深度解析
发定时器,所有的这些回调逻辑也都是由用户来提供,涉及到如下三个方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。
在 onTimer 方法中就需要去实现自己的回调逻辑,当条件满足时回调逻辑就
会被触发。
一个简单的应用是,我们在做一些时间相关的处理的时候,可能需要缓存一部
分数据,但这些数据不能一直去缓存下去,所以需要有一些过期的机制,我们可以通
过 timer 去设定这么一个时间,指定某一些数据可能在将来的某一个时间点过期,从
而把它从状态里删除掉。所有的这些和时间相关的逻辑在 Flink 内部都是由自己的
Time Service(时间服务)完成的。
3.4 Watermark 处理
一个算子的实例在收到 watermark 的时候,首先要更新当前的算子时间,这样
的话在 ProcessFunction 里方法查询这个算子时间的时候,就能获取到最新的时间。
第二步它会遍历计时器队列,这个计时器队列就是我们刚刚说到的 timer,你可以同
时注册很多 timer,Flink 会把这些 Timer 按照触发时间放到一个优先队列中。第三
步 Flink 得到一个时间之后就会遍历计时器的队列,然后逐一触发用户的回调逻辑。
通过这种方式,Flink 的某一个任务就会将当前的 watermark 发送到下游的其他任务
实例上,从而完成整个 watermark 的传播,从而形成一个闭环。Apache Flink 进阶(二):时间属性深度解析 < 27
4. Table API 中的时间
下面我们来看一看 TableSQL API 中的时间。为了让时间参与到 TableSQL
这一层的运算中,我们需要提前把时间属性放到表的 schema 中,这样的话我们才
能够在 SQL 语句或者 Table 的逻辑表达式里面使用时间去完成需求。
4.1 Table 中指定时间列
其实之前社区就怎么在 TableSQL 中去使用时间这个问题做过一定的讨论,是
把获取当前 Processing Time 的方法是作为一个特殊的 UDF,还是把这一个列物
化到整个的 schema 里面,最终采用了后者。我们这里就分开来讲一讲 Processing
Time 和 Event Time 在使用的时候怎么在 Table 中指定。
对于 Processing Time,我们知道要得到一个 Table 对象(或者注册一个
Table)有两种手段:
● 可以从一个 DataStream 转化成一个 Table;
● 直接通过 TableSource 去生成这么一个 Table;
对于第一种方法而言,我们只需要在你已有的这些列中(例子中 f1 和 f2 就是两
个已有的列),在最后用“列名.proctime”这种写法就可以把最后的这一列注册为一
个 Processing Time,以后在写查询的时候就可以去直接使用这一列。如果 Table
是通过 TableSource 生成的,就可以通过实现这一个 DefinedRowtimeAttributes 28 > Apache Flink 进阶(二):时间属性深度解析
接口,然后就会自动根据你提供的逻辑去生成对应的 Processing Time。
相对而言,在使用 Event Time 时则有一个限制,因为 Event Time 不像
Processing Time 那样是随拿随用。如果要从 DataStream 去转化得到一个 Table,必须要提前保证原始的 DataStream 里面已经存在了 Record Timestamp 和
watermark。如果想通过 TableSource 生成的,也一定要保证要接入的数据里面存
在一个类型为 long 或者 timestamp 的时间字段。
具体来说,如果要从 DataStream 去注册一个表,和 proctime 类似,只需要加
上“列名 .rowtime”就可以。需要注意的是,如果要用 Processing Time,必须保
证要新加的字段是整个 schema 中的最后一个字段,而 Event Time 的时候其实可
以去替换某一个已有的列,然后 Flink 会自动的把这一列转化成需要的 rowtime 这个
类型。
如果是通过 TableSource 生成的,只需要实现 DefinedRowtimeAttributes
接口就可以了。需要说明的一点是,在 DataStream API 这一侧其实不支持同时
存在多个 Event Time(rowtime),但是在 Table 这一层理论上可以同时存在多个
rowtime。因为 DefinedRowtimeAttributes 接口的返回值是一个对于 rowtime 描述
的 List,即其实可以同时存在多个 rowtime 列,在将来可能会进行一些其他的改进,或者基于去做一些相应的优化。
4.2 时间列和 Table 操作Apache Flink 进阶(二):时间属性深度解析 < 29
指定完了时间列之后,当我们要真正去查询时就会涉及到一些具体的操作。这里
我列举的这些操作都是和时间列紧密相关,或者说必须在这个时间列上才能进行的。
比如说“Over 窗口聚合”和“Group by 窗口聚合”这两种窗口聚合,在写 SQL 提
供参数的时候只能允许你在这个时间列上进行这种聚合。第三个就是时间窗口聚合,你在写条件的时候只支持对应的时间列。最后就是排序,我们知道在一个无尽的数据
流上对数据做排序几乎是不可能的事情,但因为这个数据本身到来的顺序已经是按照
时间属性来进行排序,所以说如果要对一个 DataStream 转化成 Table 进行排序
的话,只能是按照时间列进行排序,当然同时也可以指定一些其他的列,但是时间列
这个是必须的,并且必须放在第一位。
为什么说这些操作只能在时间列上进行?
因为我们有的时候可以把到来的数据流就看成是一张按照时间排列好的一张表,而我们任何对于表的操作,其实都是必须在对它进行一次顺序扫描的前提下完成的。
大家都知道数据流的特性之一就是一过性,某一条数据处理过去之后,将来其实不太
好去访问它。当然因为 Flink 中内部提供了一些状态机制,我们可以在一定程度上去
弱化这个特性,但是最终还是不能超越的,限制状态不能太大。所有这些操作为什么
只能在时间列上进行,因为这个时间列能够保证我们内部产生的状态不会无限的增长
下去,这是一个最终的前提。Apache Flink 进阶(三):
Checkpoint 原理剖析与应用实践
作者:唐云 ( 茶干 )
阿里巴巴高级研发工程师
大家好,今天我将跟大家分享一下 Flink 里面的 Checkpoint,共分为四个部分。
首先讲一下 Checkpoint 与 state 的关系,然后介绍什么是 state,第三部分介绍如
何在 Flink 中使用 state,第四部分则介绍 Checkpoint 的执行机制。
Checkpoint 与 state 的关系
Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作。下图
可以有一个对 Checkpoint 的直观感受,红框里面可以看到一共触发了 569K 次
Checkpoint,然后全部都成功完成,没有 fail 的。
state 其实就是 Checkpoint 所做的主要持久化备份的主要数据,看下图的具
体数据统计,其 state 也就 9kb 大小 。Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 31
什么是 state
我们接下来看什么是 state。先看一个非常经典的 word count 代码,这段代码
会去监控本地的 9000 端口的数据并对网络端口输入进行词频统计,我们本地行动
netcat,然后在终端输入 hello world,执行程序会输出什么?
答案很明显,(hello, 1) 和 (word,1)
那么问题来了,如果再次在终端输入 hello world,程序会输入什么?
答案其实也很明显,(hello, 2) 和 (world, 2)。为什么 Flink 知道之前已
经处理过一次 hello world,这就是 state 发挥作用了,这里是被称为 keyed state
存储了之前需要统计的数据,所以帮助 Flink 知道 hello 和 world 分别出现过一次。
回顾一下刚才这段 word count 代码。keyby 接口的调用会创建 keyed stream
对 key 进行划分,这是使用 keyed state 的前提。在此之后,sum 方法会调用内置32 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
的 StreamGroupedReduce 实现。
什么是 keyed state
对于 keyed state,有两个特点:
● 只能应用于 KeyedStream 的函数与操作中,例如 Keyed UDF, window
state
● keyed state 是已经分区 划分好的,每一个 key 只能属于某一个 keyed
state
对于如何理解已经分区的概念,我们需要看一下 keyby 的语义,大家可以看到
下图左边有三个并发,右边也是三个并发,左边的词进来之后,通过 keyby 会进行
相应的分发。例如对于 hello word,hello 这个词通过 hash 运算永远只会到右下方
并发的 task 上面去。Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 33
什么是 operator state
● 又称为 non-keyed state,每一个 operator state 都仅与一个 operator 的实
例绑定。
● 常见的 operator state 是 source state,例如记录当前 source 的 offset
再看一段使用 operator state 的 word count 代码:
这里的fromElements 会调用FromElementsFunction的类,其中就使用了34 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
类型为 list state 的 operator state。根据 state 类型做一个分类如下图:
除了从这种分类的角度,还有一种分类的角度是从 Flink 是否直接接管:
● Managed State:由 Flink 管理的 state,刚才举例的所有 state 均是
managed state
● Raw State:Flink 仅提供 stream 可以进行存储数据,对 Flink 而言 raw
state 只是一些 bytes
在实际生产中,都只推荐使用 managed state,本文将围绕该话题进行讨论。
如何在 Flink 中使用 state
下图就前文 word count 的 sum 所使用的 StreamGroupedReduce 类为例讲
解了如何在代码中使用 keyed state:Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 35
下图则对 word count 示例中的 FromElementsFunction 类进行详解并分享
如何在代码中使用 operator state:36 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
Checkpoint 的执行机制
在介绍 Checkpoint 的执行机制前,我们需要了解一下 state 的存储,因为
state 是 Checkpoint 进行持久化备份的主要角色。
Statebackend 的分类
下图阐释了目前 Flink 内置的三类 state backend,其中MemoryStateBackend
和 FsStateBackend 在运行时都是存储在 java heap 中的,只有在执行 Check-
point 时,FsStateBackend 才会将数据以文件格式持久化到远程存储上。而
RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的 LSM DB)对 state
进行存储。
对于 HeapKeyedStateBackend,有两种实现:
● 支持异步 Checkpoint(默认):存储格式 CopyOnWriteStateMap
● 仅支持同步 Checkpoint:存储格式 NestedStateMap
特别在 MemoryStateBackend 内使用HeapKeyedStateBackend时,Check-
point 序列化数据阶段默认有最大 5 MB 数据的限制Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 37
对于 RocksDBKeyedStateBackend,每个 state 都存储在一个单独的 column
family 内,其中 keyGroup,Key 和 Namespace 进行序列化存储在 DB 作为 key。
Checkpoint 执行机制详解
本小节将对 Checkpoint 的执行流程逐步拆解进行讲解,下图左侧是 Check-
point Coordinator,是整个 Checkpoint 的发起者,中间是由两个 source,一个
sink 组成的 Flink 作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。
1. 第一步,Checkpoint Coordinator 向所有 source 节点 trigger Check-
point;。
2. 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamp-
ort 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才
会执行相应的 Checkpoint。38 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
3. 第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)
通知给 Checkpoint coordinator。
4. 第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本
地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首
先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会
从中选择没有上传的文件进行持久化备份(紫色小三角)。Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 39
5. 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返
回通知 Coordinator。
6. 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认
为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Check-
point meta 文件。
Checkpoint 的 EXACTLY_ONCE 语义
为了实现 EXACTLY ONCE 语义,Flink 通过一个 input buffer 将在对齐阶段
收到的数据缓存起来,等对齐完成之后再进行处理。而对于 AT LEAST ONCE 语40 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
义,无需缓存收集到的数据,会对后续直接处理,所以导致 restore 时,数据可能会
被多次处理。下图是官网文档里面就 Checkpoint align 的示意图:
需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以
做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。
Savepoint 与 Checkpoint 的区别
作业恢复时,二者均可以使用,主要区别如下:
Savepoint Externalized Checkpoint
用户通过命令触发,由用户管理其创建与删除 Checkpoint 完成时,在用户给定的外部持久化
存储保存
标准化格式存储,允许作业升级或者配置变更 当作业 FAILED(或者 CANCELED)时,外部
存储的 Checkpoint 会保留下来
用户在恢复时需要提供用于恢复作业状态的
savepoint 路径
用户在恢复时需要提供用于恢复的作业状态的
Checkpoint 路径Apache Flink 进阶(四):
Flink on YarnK8s 原理剖析及实践
作者:周凯波(宝牛)
阿里巴巴技术专家
本文根据 Apache Flink 进阶篇系列直播课程整理而成,由阿里巴巴技术专家
周凯波(宝牛)分享,主要介绍 Flink on Yarn K8s 的原理及应用实践,文章将从
Flink 架构、Flink on Yarn 原理及实践、Flink on Kubernetes 原理剖析三部分内容
进行分享并对 Flink on YarnKubernetes 中存在的部分问题进行了解答。
Flink 架构概览
Flink 架构概览 -Job
用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任
务,它会生成一个 JobGraph。JobGraph 是由 source、map、keyBywindow
apply 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。42 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Flink 架构概览 -JobManager
JobManager 的功能主要有:
● 将 JobGraph 转换成 Execution Graph,最终将 Execution Graph 拿来运
行;
● Scheduler 组件负责 Task 的调度;
● Checkpoint Coordinator 组件负责协调整个任务的 Checkpoint,包括
Checkpoint 的开始和完成;
● 通过 Actor System 与 TaskManager 进行通信;
● 其它的一些功能,例如 Recovery Metadata,用于进行故障恢复时,可以从
Metadata 里面读取数据。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 43
Flink 架构概览 -TaskManager
TaskManager 是负责具体任务的执行过程,在 JobManager 申请到资源之后
开始启动。TaskManager 里面的主要组件有:
● Memory IO Manager,即内存 IO 的管理;
● Network Manager,用来对网络方面进行管理;
● Actor system,用来负责网络的通信;
TaskManager 被分成很多个 TaskSlot,每个任务都要运行在一个 TaskSlot
里面,TaskSlot 是调度资源里的最小单位。44 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
在介绍 Yarn 之前先简单的介绍一下 Flink Standalone 模式,这样有助于更好
地了解 Yarn 和 Kubernetes 架构。
● 在 Standalone 模式下,Master 和 TaskManager 可以运行在同一台机器
上,也可以运行在不同的机器上。
● 在 Master 进程中,Standalone ResourceManager 的作用是对资源进行
管理。当用户通过 Flink Cluster Client 将 JobGraph 提交给 Master 时,JobGraph 先经过 Dispatcher。
● 当 Dispatcher 收到客户端的请求之后,生成一个 JobManager。接着
JobManager 进程向 Standalone ResourceManager 申请资源,最终再启
动 TaskManager。
● TaskManager 启动之后,会有一个注册的过程,注册之后 JobManager 再
将具体的 Task 任务分发给这个 TaskManager 去执行。
以上就是一个 Standalone 任务的运行过程。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 45
Flink 运行时相关组件
接下来总结一下 Flink 的基本架构和它在运行时的一些组件,具体如下:
● Client:用户通过 SQL 或者 API 的方式进行任务的提交,提交后会生成一个
JobGraph。
● JobManager:JobManager 接受到用户的请求之后,会对任务进行调度,并且申请资源启动 TaskManager。
● TaskManager: 它 负 责 一 个 具 体 Task 的 执 行。TaskManager 向
JobManager 进行注册,当 TaskManager 接收到 JobManager 分配的任务
之后,开始执行具体的任务。
Flink on Yarn 原理及实践
Yarn 架构原理 -总览
Yarn 模式在国内使用比较广泛,基本上大多数公司在生产环境中都使用过 Yarn
模式。首先介绍一下 Yarn 的架构原理,因为只有足够了解 Yarn 的架构原理,才能
更好的知道 Flink 是如何在 Yarn 上运行的。46 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Yarn 的架构原理如上图所示,最重要的角色是 ResourceManager,主要用来
负责整个资源的管理,Client 端是负责向 ResourceManager 提交任务。
用户在 Client 端提交任务后会先给到 Resource Manager。Resource Man-
ager 会启动 Container,接着进一步启动 Application Master,即对 Master 节点
的启动。当 Master 节点启动之后,会向 Resource Manager 再重新申请资源,当
Resource Manager 将资源分配给 Application Master 之后,Application Master
再将具体的 Task 调度起来去执行。
Yarn 架构原理 -组件
Yarn 集群中的组件包括:
● ResourceManager (RM):ResourceManager (RM) 负责处理客户端请
求、启动 监控 ApplicationMaster、监控 NodeManager、资源的分配与调
度,包含 Scheduler 和 Applications Manager。
● ApplicationMaster (AM):ApplicationMaster (AM) 运行在 Slave 上,负
责数据切分、申请资源和分配、任务监控和容错。
● NodeManager (NM):NodeManager (NM) 运行在 Slave 上,用于单节
点资源管理、AMRM 通信以及汇报状态。
● Container:Container 负责对资源进行抽象,包括内存、CPU、磁盘,网络
等资源。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 47
Yarn 架构原理 -交互
以在 Yarn 上运行 MapReduce 任务为例来讲解下 Yarn 架构的交互原理:
● 首先,用户编写 MapReduce 代码后,通过 Client 端进行任务提交。
● ResourceManager 在接收到客户端的请求后,会分配一个 Container 用来
启动 ApplicationMaster,并通知 NodeManager 在这个 Container 下启动
ApplicationMaster。
● ApplicationMaster 启动后,向 ResourceManager 发起注册请求。接着
ApplicationMaster 向 ResourceManager 申请资源。根据获取到的资源,和相关的 NodeManager 通信,要求其启动程序。
● 一个或者多个 NodeManager 启动 MapReduce Task。
● NodeManager 不断汇报 MapReduce Task 状态和进展给 Application-
Master。
● 当所有 MapReduce Task 都完成时,ApplicationMaster 向 ResourceM-
anager 汇报任务完成,并注销自己。48 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Flink on Yarn-Per Job
Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之
后资源就会被释放。在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解
了,具体如下:
● 首先 Client 提交 Yarn App,比如 JobGraph 或者 JARs。
● 接 下 来 Yarn 的 ResourceManager 会 申 请 第 一 个 Container。 这 个
Container 通过 Application Master 启动进程,Application Master 里面运
行的是 Flink 程序,即 Flink-Yarn ResourceManager 和 JobManager。
● 最后 Flink-Yarn ResourceManager 向 Yarn ResourceManager 申请资
源。当分配到资源后,启动 TaskManager。TaskManager 启动后向 Flink-
Yarn ResourceManager 进行注册,注册成功后 JobManager 就会分配具
体的任务给 TaskManager 开始执行。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 49
Flink on Yarn-Session
在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。而 Session 模式则不一样,它的 Dispatcher 和 Resource-
Manager 是可以复用的。Session 模式下,当 Dispatcher 在收到请求之后,会
启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会
启动 JobManager(B) 和对应的 TaskManager 的运行。当 A、B 任务运行完成
后,资源并不会释放。Session 模式也称为多线程模式,其特点是资源会一直存在
不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN
ResourceManager。
Session 模式和 Per Job 模式的应用场景不一样。Per Job 模式比较适合那种
对启动时间不敏感,运行时间较长的任务。Seesion 模式适合短时间运行的任务,一
般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资
源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种
任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。
Yarn 模式特点
Yarn 模式的优点有:50 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
● 资源的统一管理和调度。Yarn 集群中所有节点的资源(内存、CPU、磁
盘、网络等)被抽象为 Container。计算框架需要资源进行运算任务时需要
向 Resource Manager 申请 Container,Yarn 按照特定的策略对资源进行
调度和进行 Container 的分配。Yarn 模式能通过多种任务调度策略来利用
提高集群资源利用率。例如 FIFO Scheduler、Capacity Scheduler、Fair
Scheduler,并能设置任务优先级。
● 资源隔离。Yarn 使用了轻量级资源隔离机制 Cgroups 进行资源隔离以避免相
互干扰,一旦 Container 使用的资源量超过事先定义的上限值,就将其杀死。
● 自动 failover 处理。例如 Yarn NodeManager 监控、Yarn Application-
Manager 异常恢复。
Yarn 模式虽然有不少优点,但是也有诸多缺点,例如运维部署成本较高,灵活
性不够。
Flink on Yarn 实践
关于 Flink on Yarn 的实践在社区官网上面有很多课程,例如:《Flink 安装部
署、环境配置及运行应用程序》 和 《客户端操作》都是基于 Yarn 进行讲解的,这里
就不再赘述。
社区官网:
https:ververica.cndevelopersflink-training-course1
Flink on Kubernetes 原理剖析
Kubernetes 是 Google 开源的容器集群管理系统,其提供应用部署、维
护、扩展机制等功能,利用 Kubernetes 能方便地管理跨机器运行容器化的应用。
Kubernetes 和 Yarn 相比,相当于下一代的资源管理系统,但是它的能力远远不止
这些。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 51
Kubernetes- 基本概念
Kubernetes(k8s)中的 Master 节点,负责管理整个集群,含有一个集群的
资源数据访问入口,还包含一个 Etcd 高可用键值存储服务。Master 中运行着 API
Server,Controller Manager 及 Scheduler 服务。
Node 为集群的一个操作单元,是 Pod 运行的宿主机。Node 节点里包含一个
agent 进程,能够维护和管理该 Node 上的所有容器的创建、启停等。Node 还含
有一个服务端 kube-proxy,用于服务发现、反向代理和负载均衡。Node 底层含有
docker engine,docker 引擎主要负责本机容器的创建和管理工作。
Pod 运行于 Node 节点上,是若干相关容器的组合。在 K8s 里面 Pod 是创建、调度和管理的最小单位。
Kubernetes- 架构图52 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Kubernetes 的架构如图所示,从这个图里面能看出 Kubernetes 的整个运行
过程。
● API Server 相当于用户的一个请求入口,用户可以提交命令给 Etcd,这时会
将这些请求存储到 Etcd 里面去。
● Etcd 是一个键值存储,负责将任务分配给具体的机器,在每个节点上的
Kubelet 会找到对应的 container 在本机上运行。
● 用户可以提交一个 Replication Controller 资源描述,Replication Controller
会监视集群中的容器并保持数量;用户也可以提交 service 描述文件,并由
kube proxy 负责具体工作的流量转发。
Kubernetes- 核心概念
Kubernetes 中比较重要的概念有:
● Replication Controller (RC) 用来管理 Pod 的副本。RC 确保任何时候
Kubernetes 集群中有指定数量的 pod 副本 (replicas) 在运行, 如果少于指定
数量的 pod 副本,RC 会启动新的 Container,反之会杀死多余的以保证数量
不变。
● Service 提供了一个统一的服务访问入口以及服务代理和发现机制
● Persistent Volume(PV) 和 Persistent Volume Claim(PVC) 用于数据的持
久化存储。
● ConfigMap 是指存储用户程序的配置文件,其后端存储是基于 Etcd。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 53
Flink on Kubernetes- 架构
Flink on Kubernetes 的架构如图所示,Flink 任务在 Kubernetes 上运行的步
骤有:
● 首先往 Kubernetes 集群提交了资源描述文件后,会启动 Master 和 Worker
的 container。
● Master Container 中会启动 Flink Master Process,包含 Flink-Container
ResourceManager、JobManager 和 Program Runner。
● Worker Container 会启动 TaskManager,并向负责资源管理的 Re-
sourceManager 进行注册,注册完成之后,由 JobManager 将具体的任务
分给 Container,再由 Container 去执行。
● 需要说明的是,在 Flink 里的 Master 和 Worker 都是一个镜像,只是脚本的54 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
命令不一样,通过参数来选择启动 master 还是启动 Worker。
Flink on Kubernetes-JobManager
JobManager 的执行过程分为两步 :
● 首先,JobManager 通过 Deployment 进行描述,保证 1 个副本的 Con-
tainer 运行 JobManager,可以定义一个标签,例如 flink-jobmanager。
● 其次,还需要定义一个 JobManager Service,通过 service name 和 port
暴露 JobManager 服务,通过标签选择对应的 pods。
Flink on Kubernetes-TaskManager
TaskManager 也是通过 Deployment 来进行描述,保证 n 个副本的 Contain-
er 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager。
对于 JobManager 和 TaskManager 运行过程中需要的一些配置文件,如:
flink-conf.yaml、hdfs-site.xml、core-site.xml,可以通过将它们定义为 Config-
Map 来实现配置的传递和读取。
Flink on Kubernetes- 交互
整个交互的流程比较简单,用户往 Kubernetes 集群提交定义好的资源描述
文件即可,例如 deployment、configmap、service 等描述。后续的事情就交给
Kubernetes 集群自动完成。Kubernetes 集群会按照定义好的描述来启动 pod,运Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 55
行用户程序。各个组件的具体工作如下:
● Service: 通过标签(label selector) 找到 job manager 的 pod 暴露服务。
● Deployment:保证 n 个副本的 container 运行 JMTM,应用升级策略。
● ConfigMap:在每个 pod 上通过挂载 etcflink 目录,包含 flink-conf.yaml
内容。
Flink on Kubernetes- 实践
接下来就讲一下 Flink on Kubernetes 的实践篇,即 K8s 上是怎么运行任务的。
● Session Cluster
Session Cluster
启动
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
Submit job
kubectl port-forward serviceflink-jobmanager 8081:8081
binflink run -d -m localhost:8081 .examplesstreaming
TopSpeedWindowing.jar
停止
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
首先启动 Session Cluster,执行上述三条启动命令就可以将 Flink 的 Job-
Manager-service、jobmanager-deployment、taskmanager-deployment 启动
起来。启动完成之后用户可以通过接口进行访问,然后通过端口进行提交任务。若想
销毁集群,直接用 kubectl delete 即可,整个资源就可以销毁。56 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Flink 官方提供的例子如图所示,图中左侧为 jobmanager-deployment.yaml
配置,右侧为 taskmanager-deployment.yaml 配置。
在 jobmanager-deployment.yaml 配置中,代码的第一行为 apiVersion,apiVersion 是 API 的一个版本号,版本号用的是 extensionsvlbetal 版本。资源
类型为 Deployment,元数据 metadata 的名为 flink-jobmanager,spec 中含
有副本数为 1 的 replicas,labels 标签用于 pod 的选取。containers 的镜像名
为 jobmanager,containers 包含从公共 docker 仓库下载的 image,当然也可
以使用公司内部的私有仓库。args 启动参数用于决定启动的是 jobmanager 还是
taskmanager;ports 是服务端口,常见的服务端口为 8081 端口;env 是定义的环
境变量,会传递给具体的启动脚本。
右图为 taskmanager-deployment.yaml 配置,taskmanager-deployment.
yaml 配置与 jobmanager-deployment.yaml 相似,但 taskmanager-deploy-
ment.yaml 的副本数是 2 个。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 57
接下来是 jobmanager-service.yaml 的配置,jobmanager-service.yaml 的
资源类型为 Service,在 Service 中的配置相对少一些,spec 中配置需要暴露的服
务端口的 port,在 selector 中,通过标签选取 jobmanager 的 pod。
● Job Cluster
除了 Session 模式,还有一种 Per Job 模式。在 Per Job 模式下,需要将用户
代码都打到镜像里面,这样如果业务逻辑的变动涉及到 Jar 包的修改,都需要重新生
成镜像,整个过程比较繁琐,因此在生产环境中使用的比较少。
以使用公用 docker 仓库为例,Job Cluster 的运行步骤如下:
● build 镜像:在 flinkflink-containerdocker 目录下执行 build.sh 脚本,指定从哪个版本开始去构建镜像,成功后会输出 “Successfully tagged 58 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
topspeed:latest” 的提示。
sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8--scala-version 2.11 --job-jar ~
flinkflink-1.7.1examplesstreamingTopSpeedWindowing.jar --image-name
topspeed
● 上传镜像:在 hub.docker.com 上需要注册账号和创建仓库进行上传镜像。
docker tag topspeed zkb555topspeedwindowing
docker push zkb555topspeedwindowing
● 启动任务:在镜像上传之后,可以启动任务。
kubectl create -f job-cluster-service.yaml
FLINK_IMAGE_NAME=zkb555topspeedwindowing:latest FLINK_JOB=org.apache.flink.
streaming.
examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-
cluster-job.
yaml.template | kubectl create -f -
FLINK_IMAGE_NAME=zkb555topspeedwindowing:latest FLINK_JOB_PARALLELISM=4
envsubst <
task-manager-deployment.yaml.template | kubectl create -f -
Flink on YarnKubernetes 问题解答
Q: Flink 在 K8s 上可以通过 Operator 方式提交任务吗?
目前 Flink 官方还没有提供 Operator 的方式,Lyft 公司开源了自己的 Opera-
tor 实现:https:github.comlyftflinkk8soperator。
Q: 在 K8s 集群上如果不使用 Zookeeper 有没有其他高可用(HA)的方案?
Etcd 是一个类似于 Zookeeper 的高可用键值服务,目前 Flink 社区正在
考 虑 基 于 Etcd 实 现 高 可 用 的 方 案(https:issues.apache.orgjirabrowse
FLINK-11105)以及直接依赖 K8s API 的方案(https:issues.apache.orgjira
browseFLINK-12884)。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 59
Q: Flink on K8s 在任务启动时需要指定 TaskManager 的个数,有和 Yarn
一样的动态资源申请方式吗?
Flink on K8s 目前的实现在任务启动前就需要确定好 TaskManager 的个数,这样容易造成 TM 指定太少,任务无法启动,或者指定的太多,造成资源浪费。社区
正在考虑实现和 Yarn 一样的任务启动时动态资源申请的方式。这是一种和 K8s 结
合的更为 Nativey 的方式,称为 Active 模式。Active 意味着 ResourceManager
可以直接向 K8s 集群申请资源。具体设计方案和进展请关注:
https:issues.apache.orgjirabrowseFLINK-9953。Apache Flink 进阶(五):数据类型和序列化
作者:马庆祥
奇虎 360 数据开发高级工程师
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、奇
虎 360 数据开发高级工程师马庆祥老师分享。文章主要从如何为 Flink 量身定制序列
化框架、Flink 序列化的最佳实践、Flink 通信层的序列化以及问答环节四部分分享。
为 Flink 量身定制的序列化框架
为什么要为 Flink 量身定制序列化框架?
大家都知道现在大数据生态非常火,大多数技术组件都是运行在 JVM 上的,Flink 也是运行在 JVM 上,基于 JVM 的数据分析引擎都需要将大量的数据存储在内
存中,这就不得不面临 JVM 的一些问题,比如 Java 对象存储密度较低等。针对这
些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来
进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在 Java 生态圈中已经有许多序列化框架,比如说 Java serialization, Kryo,Apache Avro 等等。但是 Flink 依然是选择了自己定制的序列化框架,那么到底有
什么意义呢?若 Flink 选择自己定制的序列化框架,对类型信息了解越多,可以在早
期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直
接操作二进制数据。 Apache Flink 进阶(五):数据类型和序列化 < 61
Flink 的数据类型
Flink 在其内部构建了一套自己的类型系统,Flink 现阶段支持的类型分类如图
所示,从图中可以看到 Flink 类型可以分为基础类型(Basic)、数组(Arrays)、复合
类型(Composite)、辅助类型(Auxiliary)、泛型和其它类型(Generic)。Flink 支
持任意的 Java 或是 Scala 类型。不需要像 Hadoop 一样去实现一个特定的接口
(org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。62 > Apache Flink 进阶(五):数据类型和序列化
那这么多的数据类型,在 Flink 内部又是如何表示的呢?图示中的 Person 类,复合类型的一个 Pojo 在 Flink 中是用 PojoTypeInfo 来表示,它继承至 TypeInfor-
mation,也即在 Flink 中用 TypeInformation 作为类型描述符来表示每一种要表示的
数据类型。
TypeInformation
TypeInformation 的思维导图如图所示,从图中可以看出,在 Flink 中每一个具
体的类型都对应了一个具体的 TypeInformation 实现类,例如 BasicTypeInforma-
tion 中的 IntegerTypeInformation 和 FractionalTypeInformation 都具体的对应了
一个 TypeInformation。然后还有 BasicArrayTypeInformation、CompositeType
以及一些其它类型,也都具体对应了一个 TypeInformation。
TypeInformation 是 Flink 类型系统的核心类。对于用户自定义的 Function
来说,Flink 需要一个类型信息来作为该函数的输入输出类型,即 TypeInfomation。
该类型信息类作为一个工具来生成对应类型的序列化器 TypeSerializer,并用于执行Apache Flink 进阶(五):数据类型和序列化 < 63
语义检查,比如当一些字段在作为 joing 或 grouping 的键时,检查这些字段是否在
该类型中存在。
如何使用 TypeInformation ?下面的实践中会为大家介绍。
Flink 的序列化过程
在 Flink 序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何
而来?
每一个具体的数据类型都对应一个 TypeInformation 的具体实现,每一个
TypeInformation 都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink
的序列化过程图可以看到 TypeInformation 会提供一个 createSerialize 方法,通
过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象 TypeSerial-
izer。64 > Apache Flink 进阶(五):数据类型和序列化
对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数
据集进行序列化和反序列化,比如,BasicTypeInfo、WritableTypeIno 等,但针对
GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这
种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类
型的序列化器。
简单的介绍下 Pojo 的类型规则,即在满足一些条件的情况下,才会选用 Pojo
的序列化进行相应的序列化与反序列化的一个操作。即类必须是 Public 的,且类有
一个 public 的无参数构造函数,该类(以及所有超类)中的所有非静态 no-static、非瞬态 no-transient 字段都是 public 的(和非最终的 final)或者具有公共 getter 和
setter 方法,该方法遵循 getter 和 setter 的 Java bean 命名约定。当用户定义的数
据类型无法识别为 POJO 类型时,必须将其作为 GenericType 处理并使用 Kryo 进
行序列化。
Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常
用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足
你的需求,Flink 的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需
要实现 TypeInformation、TypeSerializer 和 TypeComparator 即可定制自己类型
的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。Apache Flink 进阶(五):数据类型和序列化 < 65
序列化就是将数据结构或者对象转换成一个二进制串的过程,在 Java 里面可以
简单地理解成一个 byte 数组。而反序列化恰恰相反,就是将序列化过程中所生成的
二进制串转换成数据结构或者对象的过程。下面就以内嵌型的 Tuple 3 这个对象为
例,简述一下它的序列化过程。
Tuple 3 包含三个层面,一是 int 类型,一是 double 类型,还有一个是
Person。Person 包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图
中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需
要占用四个字节就可以了。根据 int 占用四个字节,这个能够体现出 Flink 可序列化
过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反
序列化操作。相反,如果采用 Java 的序列化,虽然能够存储更多的属性信息,但一
次占据的存储空间会受到一定的损耗。
Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把
一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相
应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去
支持。MemorySegment 具有什么作用呢?
MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上,它代表 1
个固定长度的内存,默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最
小的内存分配单元,相当于是 Java 的一个 byte 数组。 每条记录都会以序列化的形
式存储在一个或多个 MemorySegment 中。
Flink 序列化的最佳实践
最常见的场景
Flink 常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型
提示、手动创建 TypeInformation,具体介绍如下:66 > Apache Flink 进阶(五):数据类型和序列化
● 注册子类型:如果函数签名只描述了超类型,但是它们实际上在执行期间使
用了超类型的子类型,那么让 Flink 了解这些子类型会大大提高性能。可
以 在 StreamExecutionEnvironment 或 ExecutionEnvironment 中 调 用
.registertype (clazz) 注册子类型信息。
● 注册自定义序列化:对于不适用于自己的序列化框架的数据类型,Flink 会使
用 Kryo 来进行序列化,并不是所有的类型都与 Kryo 无缝连接,具体注册方
法在下文介绍。
● 添加类型提示:有时,当 Flink 用尽各种手段都无法推测出泛型信息时,用户
需要传入一个类型提示 TypeHint,这个通常只在 Java API 中需要。
● 手动创建一个 TypeInformation:在某些 API 调用中,这可能是必需的,因
为 Java 的泛型类型擦除导致 Flink 无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为 Flink 已经提
供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景
下,需要去做一些相应的处理。
实践 -类型声明
类型声明去创建一个类型信息的对象是通过哪种方式?通常是用 TypeInforma-
tion.of 方法来创建一个类型信息的对象,具体说明如下:
● 对于非泛型类,直接传入 class 对象即可。
PojoTypeInfo typeInfo = (PojoTypeInfo) TypeInformation.
of(Person.class);
● 对于泛型类,需要通过 TypeHint 来保存泛型类型信息。
final TypeInfomation> resultType = TypeInformation.
of(new TypeHint>{});
● 预定义常量。Apache Flink 进阶(五):数据类型和序列化 < 67
如 BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的
类型声明,可以直接使用。而且 Flink 还提供了完全等价的 Types 类(org.apache.
flink.api.common.typeinfo.Types)。特别需要注意的是,flink-table 模块也有一个
Types 类(org.apache.flink.table.api.Types),用于 table 模块内部的类型定义信
息,用法稍有不同。使用 IDE 的自动 import 时一定要小心。
● 自定义 TypeInfo 和 TypeInfoFactory。
通过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存
储更紧凑,运行时也更高效。需要注意在自定义类上使用 @TypeInfo 注解,随后创
建相应的 TypeInfoFactory 并覆盖 createTypeInfo 方法。
实践 -注册子类型
Flink 认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。
StreamExecutionEnvironment 和 ExecutionEnvironment 提供 registerType
方法用来向 Flink 注册子类信息。
final ExecutionEnvironment env = ExecutionEnvironment.
getExecutionEnvironment;Env. registerType(typeClass);
在 registerType 方法内部,会使用 TypeExtractor 来提取类型信息,如上图
所示,获取到的类型信息属于 PojoTypeInfo 及其子类,那么需要将其注册到一起,否则统一交给 Kryo 去处理,Flink 并不过问 ( 这种情况下性能会变差 )。68 > Apache Flink 进阶(五):数据类型和序列化
实践 -Kryo 序列化
对于 Flink 无法序列化的类型(例如用户自定义类型,没有 registerType,也没
有自定义 TypeInfo 和 TypeInfoFactory),默认会交给 Kryo 处理,如果 Kryo 仍然
无法处理(例如 Guava、Thrift、Protobuf 等第三方库的一些类),有两种解决方案:
● 强制使用 Avro 来代替 Kryo。
env.getConfig.enableForceAvro;
● 为 Kryo 增加自定义的 Serializer 以增强 Kryo 的功能。
env.getConfig.addDefaultKryoSerializer(clazz, serializer);
注:如果希望完全禁用 Kryo(100% 使用 Flink 的序列化机制),可以通过 Kryo-
env.getConfig.disableGenericTypes 的方式完成,但注意一切无法处理的类都
将导致异常,这种对于调试非常有效。
Flink 通信层的序列化
Flink 的 Task 之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之
后写入 NetworkBufferPool,然后下层的 Task 读出之后再进行反序列化操作,最
后进行逻辑处理。
为了使得记录以及事件能够被写入 Buffer,随后在消费时再从 Buffer 中读出,Flink 提供了数据记录序列化器(RecordSerializer)与反序列化器(RecordDeseri-
alizer)以及事件序列化器(EventSerializer)。
Function 发送的数据被封装成 SerializationDelegate,它将任意元素公开为
IOReadableWritable 以进行序列化,通过 setInstance 来传入要序列化的数据。
在 Flink 通信层的序列化中,有几个问题值得关注,具体如下:Apache Flink 进阶(五):数据类型和序列化 < 69
● 何时确定 Function 的输入输出类型?
在构建 StreamTransformation 的时候通过 TypeExtractor 工具确定 Func-
tion 的输入输出类型。TypeExtractor 类可以根据方法签名、子类信息等蛛丝马迹自
动提取或恢复类型信息。
● 何时确定 Function 的序列化 反序列化器?
构造 StreamGraph 时,通过 TypeInfomation 的 createSerializer 方法
获取对应类型的序列化器 TypeSerializer,并在 addOperator 的过程中执行
setSerializers 操作,设置 StreamConfig 的 TYPESERIALIZERIN1 、 TYPE-
SERIALIZERIN2、 TYPESERIALIZEROUT_1 属性。
● 何时进行真正的序列化 反序列化操作?这个过程与 TypeSerializer 又是怎么
联系在一起的呢?
大家都应该清楚 Tsk 和 StreamTask 两个概念,Task 是直接受 TaskMan-
ager 管理和调度的,而 Task 又会调用 StreamTask,而 StreamTask 中真正封装
了算子的处理逻辑。在 run 方法中,首先将反序列化后的数据封装成 StreamRe-70 > Apache Flink 进阶(五):数据类型和序列化
cord 交给算子处理;然后将处理结果通过 Collector 发动给下游 ( 在构建 Collector
时已经确定了 SerializtionDelegate),并通过 RecordWriter 写入器将序列化后的结
果写入 DataOutput;最后序列化的操作交给 SerializerDelegate 处理,实际还是通
过 TypeSerializer 的 serialize 方法完成。Apache Flink 进阶(六):Flink 作业执行深度解析
作者:岳猛,网易云音乐实时计算平台研发工程师
整理:毛鹤
本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink Contribu-
tor、网易云音乐实时计算平台研发工程师岳猛分享。主要分享内容为 Flink Job 执行
作业的流程,文章将从两个方面进行分享:一是如何从 Program 到物理执行计划,二是生成物理执行计划后该如何调度和执行。
Flink 四层转化流程
Flink 有四层转换流程,第一层为 Program 到 StreamGraph;第二层为
StreamGraph 到 JobGraph;第三层为 JobGraph 到 ExecutionGraph;第四层为
ExecutionGraph 到物理执行计划。通过对 Program 的执行,能够生成一个 DAG
执行图,即逻辑执行图。如下:72 > Apache Flink 进阶(六):Flink 作业执行深度解析
第一部分将先讲解四层转化的流程,然后将以详细案例讲解四层的具体转化。
● 第一层 StreamGraph 从 Source 节点开始,每一次 transform 生成一个
StreamNode,两个 StreamNode 通过 StreamEdge 连接在一起 , 形成
StreamNode 和 StreamEdge 构成的 DAG。
● 第二层 JobGraph,依旧从 Source 节点开始,然后去遍历寻找能够嵌到一
起的 operator,如果能够嵌到一起则嵌到一起,不能嵌到一起的单独生成
jobVertex,通过 JobEdge 链接上下游 JobVertex,最终形成 JobVertex 层
面的 DAG。
● JobVertex DAG 提交到任务以后,从 Source 节点开始排序 , 根据 JobVer-
tex 生成 ExecutionJobVertex,根据 jobVertex 的 IntermediateDataSet
构建 IntermediateResult,然后 IntermediateResult 构建上下游的依赖关
系,形成 ExecutionJobVertex 层面的 DAG 即 ExecutionGraph。
● 最后通过 ExecutionGraph 层到物理执行层。
Program 到 StreamGraph 的转化
Program 转换成 StreamGraph 具体分为三步:
● 从 StreamExecutionEnvironment.execute 开始执行程序,将 transform 添
加到 StreamExecutionEnvironment 的 transformations。
● 调用 StreamGraphGenerator 的 generateInternal 方法,遍历 transfor-
mations 构建 StreamNode 及 StreamEage。
● 通过 StreamEdge 连接 StreamNode。Apache Flink 进阶(六):Flink 作业执行深度解析 < 73
通过 WindowWordCount 来看代码到 StreamGraph 的转化,在 flatMap trans-
form 设置 slot 共享组为 flatMapsg,并发设置为 4,在聚合的操作中设置 slot 共享
组为 sumsg, sum 和 counts 并发设置为 3,这样设置主要是为了演示后面如何
嵌到一起的,跟上下游节点的并发以及上游的共享组有关。
WindowWordCount 代码中可以看到,在 readTextFile 中会生成一个 transform,且 transform 的 ID 是 1;然后到 flatMap 会生成一个 transform, transform 的
ID 是 2;接着到 keyBy 生成一个 transform 的 ID 是 3;再到 sum 生成一个
transform 的 ID 是 4;最后到 counts 生成 transform 的 ID 是 5。
transform 的结构如图所示,第一个是 flatMap 的 transform,第二个是 window 74 > Apache Flink 进阶(六):Flink 作业执行深度解析
的 transform, 第 三 个 是 SinkTransform 的 transform。 除 此 之 外, 还 能 在
transform 的结构中看到每个 transform 的 input 是什么。
接下来介绍一下 StreamNode 和 StreamEdge。
● StreamNode 是用来描述 operator 的逻辑节点,其关键成员变量有
slotSharingGroup、jobVertexClass、inEdges、outEdges 以 及 trans-
formationUID;
● StreamEdge 是用来描述两个 operator 逻辑的链接边,其关键变量有
sourceVertex、targetVertex。
WindowWordCount transform 到 StreamGraph 转化如图所示,StreamEx-
ecutionEnvironment 的 transformations 存在 3 个 transform,分别是 Flat Map
(Id 2)、Window(Id 4)、Sink(Id 5)。
transform 的时候首先递归处理 transform 的 input,生成 StreamNode,然后
通过 StreamEdge 链接上下游 StreamNode。需要注意的是,有些 transform 操作
并不会生成 StreamNode 如 PartitionTransformtion,而是生成个虚拟节点。Apache Flink 进阶(六):Flink 作业执行深度解析 < 75
在转换完成后可以看到,streamNodes 有四种 transform 形式,分别为 Source、Flat Map、Window、Sink。
每个 streamNode 对象都携带并发个数、slotSharingGroup、执行类等运行
信息。76 > Apache Flink 进阶(六):Flink 作业执行深度解析
StreamGraph 到 JobGraph 的转化
StreamGraph 到 JobGraph 的转化步骤:
● 设置调度模式,Eager 所有节点立即启动。
● 广度优先遍历 StreamGraph,为每个 streamNode 生成 byte 数组类型的
hash 值。
● 从 source 节点开始递归寻找嵌到一起的 operator,不能嵌到一起的节点单独
生成 jobVertex,能够嵌到一起的开始节点生成 jobVertex,其他节点以序列
化的形式写入到 StreamConfig,然后 merge 到 CHAINEDTASKCONFIG,再通过 JobEdge 链接上下游 JobVertex。
● 将每个 JobVertex 的入边 (StreamEdge) 序列化到该 StreamConfig。
● 根据 group name 为每个 JobVertext 指定 SlotSharingGroup。
● 配置 checkpoint。Apache Flink 进阶(六):Flink 作业执行深度解析 < 77
● 将缓存文件存文件的配置添加到 configuration 中。
● 设置 ExecutionConfig。
从 source 节点递归寻找嵌到一起的 operator 中,嵌到一起需要满足一定的条
件,具体条件介绍如下:
● 下游节点只有一个输入。
● 下游节点的操作符不为 null。
● 上游节点的操作符不为 null。
● 上下游节点在一个槽位共享组内。
● 下游节点的连接策略是 ALWAYS。
● 上游节点的连接策略是 HEAD 或者 ALWAYS。
● edge 的分区函数是 ForwardPartitioner 的实例。
● 上下游节点的并行度相等。
● 可以进行节点连接操作。
JobGraph 对象结构如上图所示,taskVertices 中只存在 Window、Flat Map、Source 三个 TaskVertex,Sink operator 被嵌到 window operator 中去了。
为什么要为每个 operator 生成 hash 值?
Flink 任务失败的时候,各个 operator 是能够从 checkpoint 中恢复到失败之前
的状态的,恢复的时候是依据 JobVertexID(hash 值)进行状态恢复的。相同的任
务在恢复的时候要求 operator 的 hash 值不变,因此能够获取对应的状态。78 > Apache Flink 进阶(六):Flink 作业执行深度解析
每个 operator 是怎样生成 hash 值的?
如果用户对节点指定了一个散列值,则基于用户指定的值能够产生一个长度为
16 的字节数组。如果用户没有指定,则根据当前节点所处的位置,产生一个散列值。
考虑的因素主要有三点:
● 一是在当前 StreamNode 之前已经处理过的节点的个数,作为当前 Stream-
Node 的 id,添加到 hasher 中;
● 二 是 遍 历 当 前 StreamNode 输 出 的 每 个 StreamEdge, 并 判 断 当 前
StreamNode 与这个 StreamEdge 的目标 StreamNode 是否可以进行链
接,如果可以,则将目标 StreamNode 的 id 也放入 hasher 中,且这个目标
StreamNode 的 id 与当前 StreamNode 的 id 取相同的值;
● 三是将上述步骤后产生的字节数据,与当前 StreamNode 的所有输入
StreamNode 对应的字节数据,进行相应的位操作,最终得到的字节数据,就是当前 StreamNode 对应的长度为 16 的字节数组。
JobGraph 到 ExexcutionGraph 以及物理执行计划Apache Flink 进阶(六):Flink 作业执行深度解析 < 79
JobGraph 到 ExexcutionGraph 以及物理执行计划的流程:
● 将 JobGraph 里面的 jobVertex 从 Source 节点开始排序。
● 在 executionGraph.attachJobGraph(sortedTopology) 方 法 里 面, 根 据
JobVertex 生成 ExecutionJobVertex,在 ExecutionJobVertex 构造方法
里面,根据 jobVertex 的 IntermediateDataSet 构建 IntermediateResult,根据 jobVertex 并发构建 ExecutionVertex,ExecutionVertex 构建的时
候,构建 IntermediateResultPartition(每一个 Execution 构建 Intermedi-
ateResult 数个 IntermediateResultPartition );将创建的 ExecutionJob-
Vertex 与前置的 IntermediateResult 连接起来。
● 构建 ExecutionEdge ,连接到前面的 IntermediateResultPartition,最终从
ExecutionGraph 到物理执行计划。
Flink Job 执行流程
Flink On Yarn 模式
基于 Yarn 层面的架构类似 Spark on Yarn 模式,都是由 Client 提交 App 到
RM 上面去运行,然后 RM 分配第一个 container 去运行 AM,然后由 AM 去负责
资源的监督和管理。需要说明的是,Flink 的 Yarn 模式更加类似 Spark on Yarn 的 80 > Apache Flink 进阶(六):Flink 作业执行深度解析
cluster 模式,在 cluster 模式中,dirver 将作为 AM 中的一个线程去运行。
Flink on Yarn 模式也是会将 JobManager 启动在 container 里面,去做
个 driver 类似的任务调度和分配,Yarn AM 与 Flink JobManager 在同一个
Container 中,这样 AM 可以知道 Flink JobManager 的地址,从而 AM 可以申请
Container 去启动 Flink TaskManager。待 Flink 成功运行在 Yarn 集群上,Flink
Yarn Client 就可以提交 Flink Job 到 Flink JobManager,并进行后续的映射、调
度和计算处理。
Fink on Yarn 的缺陷
● 资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周
期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下
降时无法归还空闲的资源,在负载上升时也无法动态扩展。
● On-Yarn 模式下,所有的 container 都是固定大小的,导致无法根据作业需
求来调整 container 的结构。譬如 CPU 密集的作业或许需要更多的核,但不
需要太多内存,固定结构的 container 会导致内存被浪费。
● 与容器管理基础设施的交互比较笨拙,需要两个步骤来启动 Flink 作业: 1.启
动 Flink 守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器
部署的一部分,那么将不再需要步骤 2。
● On-Yarn 模式下,作业管理页面会在作业完成后消失不可访问。
● Flink 推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行
多个作业的 session 模式,令人疑惑。
在 Flink 版本 1.5 中引入了 Dispatcher,Dispatcher 是在新设计里引入的一个
新概念。Dispatcher 会从 Client 端接受作业提交请求并代表它在集群管理器上启动
作业。Apache Flink 进阶(六):Flink 作业执行深度解析 < 81
引入 Dispatcher 的原因主要有两点:
● 第一,一些集群管理器需要一个中心化的作业生成和监控实例;
● 第二,能够实现 Standalone 模式下 JobManager 的角色,且等待作业提交。
在一些案例中,Dispatcher 是可选的(Yarn) 或者不兼容的 (kubernetes)。
资源调度模型重构下的 Flink On Yarn 模式
● 没有 Dispatcher job 运行过程
客 户 端 提 交 JobGraph 以 及 依 赖 jar 包 到 YarnResourceManager, 接
着 Yarn ResourceManager 分配第一个 container 以此来启动 AppMaster,Application Master 中会启动一个 FlinkResourceManager 以及 JobManager,JobManager 会根据 JobGraph 生成的 ExecutionGraph 以及物理执行计划向
FlinkResourceManager 申请 slot,FlinkResoourceManager 会管理这些 slot
以及请求,如果没有可用 slot 就向 Yarn 的 ResourceManager 申请 container,container 启动以后会注册到 FlinkResourceManager,最后 JobManager 会将
subTask deploy 到对应 container 的 slot 中去。82 > Apache Flink 进阶(六):Flink 作业执行深度解析
● 在有 Dispatcher 的模式下
会增加一个过程,就是 Client 会直接通过 HTTP Server 的方式,然后用
Dispatcher 将这个任务提交到 Yarn ResourceManager 中。
新框架具有四大优势,详情如下:
● client 直接在 Yarn 上启动作业,而不需要先启动一个集群然后再提交作业到
集群。因此 client 再提交作业后可以马上返回。
● 所有的用户依赖库和配置文件都被直接放在应用的 classpath,而不是用动态
的用户代码 classloader 去加载。
● container 在需要时才请求,不再使用时会被释放。
●“需要时申请”的 container 分配方式允许不同算子使用不同 profile (CPU 和
内存结构 ) 的 container。Apache Flink 进阶(六):Flink 作业执行深度解析 < 83
新的资源调度框架下 single cluster job on Yarn 流程介绍
single cluster job on Yarn 模式涉及三个实例对象:
● clifrontend
○ Invoke App code;
○ 生成 StreamGraph,然后转化为 JobGraph;
● YarnJobClusterEntrypoint(Master)
○ 依次启动 YarnResourceManager、MinDispatcher、JobManagerRun-
ner 三者都服从分布式协同一致的策略;
○ JobManagerRunner 将 JobGraph 转 化 为 ExecutionGraph , 然 后
转化为物理执行任务 Execution,然后进行 deploy,deploy 过程会向
YarnResourceManager 请求 slot,如果有直接 deploy 到对应的 Yarn-
TaskExecutiontor 的 slot 里面,没有则向 Yarn 的 ResourceManager
申请,带 container 启动以后 deploy。
● YarnTaskExecutorRunner (slave)
○ 负责接收 subTask,并运行。
整个任务运行代码调用流程如下图:84 > Apache Flink 进阶(六):Flink 作业执行深度解析
subTask 在执行时是怎么运行的?
调用 StreamTask 的 invoke 方法,执行步骤如下:
initializeState即operator的 initializeState
openAllOperators 即operator 的open 方法
最后调用 run 方法来进行真正的任务处理
我们来看下 flatMap 对应的 OneInputStreamTask 的 run 方法具体是怎么处
理的。
@Override protected void run throws Exception {
cache processor reference on the stack, to make the code more JIT
friendly
final StreamInputProcessor inputProcessor = this.inputProcessor;
while (running inputProcessor.processInput{
all the work happens in the “processInput” method
}
}
最终是调用 StreamInputProcessor 的 processInput 做数据的处理,这里面
包含用户的处理逻辑。
public boolean processInput throws Exception {Apache Flink 进阶(六):Flink 作业执行深度解析 < 85
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.
getMetricGroup).getIOMetricGroup.getNumRecordsInCounter;
} catch (Exception e) {
LOG.warn( “An exception occurred during the metrics setup.” ,e);
numRecordsIn = new SimpleCounter;
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.
getNextRecord(deserializationDelegate);
if (result.isBufferConsumed{
currentRecordDeserializer.getCurrentBuffer.
recycleBuffer;
currentRecordDeserializer = null;
}
if (result.isFullRecord{
StreamElement recordOrMark = deserializationDelegate.
getInstance;
处理 watermark
if (recordOrMark.isWatermark{
handle watermark
watermark处理逻辑,这里可能引起 timer的trigger
statusWatermarkValve.inputWatermark(recordOrMark.
asWatermark, currentChannel);
continue;
} else if (recordOrMark.isStreamStatus{
handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.
asStreamStatus, currentChannel);
continue;
处理latency watermark
} else if (recordOrMark.isLatencyMarker{
handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.
asLatencyMarker);
}
continue;
} else {86 > Apache Flink 进阶(六):Flink 作业执行深度解析
用户的真正的代码逻辑
now we can do the actual processing
StreamRecord record = recordOrMark.asRecord;
synchronized (lock) {
numRecordsIn.inc;
streamOperator.setKeyContextElement1(record);
处理数据
streamOperator.processElement(record);
}
return true;
}
}
}
这里会进行 checkpoint barrier 的判断和对齐,以及不同 partition 里面
checkpoint barrier 不一致时候的,数据buffer,final BufferOrEvent bufferOrEvent = barrierHandler.
getNextNonBlocked;
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer{
currentChannel = bufferOrEvent.getChannelIndex;
currentRecordDeserializer =
recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.
getBuffer);
}
else {
Event received
final AbstractEvent event = bufferOrEvent.getEvent;
if (event.getClass != EndOfPartitionEvent.class) {
throw new IOException( “Unexpected event: “ + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty{
throw new IllegalStateException( “Trailing data in
checkpoint barrier handler.” );
}
return false;
}
}
}
streamOperator.processElement(record) 最终会调用用户的代码处理逻辑,假如 operator 是 StreamFlatMap 的话。Apache Flink 进阶(六):Flink 作业执行深度解析 < 87
@Override
public void processElement(StreamRecord element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue, collector);用户代码
}
如有不正确的地方,欢迎指正,关于 Flink 资源调度架构调整,网上有一篇非常
不错的针对 FLIP-6 的翻译,推荐给大家。资源调度模型重构。链接如下:
http:www.whitewood.me20180617FLIP6- 资源调度模型重构 Apache Flink 进阶(七):网络流控及反压剖析
作者:张俊,OPPO 大数据平台研发负责人
整理:张友亮
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contribu-
tor、OPPO 大数据平台研发负责人张俊老师分享,社区志愿者张友亮整理。主要
内容如下:
● 网络流控的概念与背景
● TCP 的流控机制
● Flink TCP-based 反压机制(before V1.5)
● Flink Credit-based 反压机制 (since V1.5)
● 总结与思考
网络流控的概念与背景
1. 为什么需要网络流控
首先我们可以看下这张最精简的网络流控的图,Producer 的吞吐率是 2MBs,Apache Flink 进阶(七):网络流控及反压剖析 < 89
Consumer 是 1MBs,这个时候我们就会发现在网络通信的时候我们的 Producer
的速度是比 Consumer 要快的,有 1MBs 的这样的速度差,假定我们两端都有一个
Buffer,Producer 端有一个发送用的 Send Buffer,Consumer 端有一个接收用的
Receive Buffer,在网络端的吞吐率是 2MBs,过了 5s 后我们的 Receive Buffer
可能就撑不住了,这时候会面临两种情况:
● 如果 Receive Buffer 是有界的,这时候新到达的数据就只能被丢弃掉了。
● 如果 Receive Buffer 是无界的,Receive Buffer 会持续的扩张,最终会导
致 Consumer 的内存耗尽。
2. 网络流控的实现:静态限速
为了解决这个问题,我们就需要网络流控来解决上下游速度差的问题,传统的做
法可以在 Producer 端实现一个类似 Rate Limiter 这样的静态限流,Producer 的发
送速率是 2MBs,但是经过限流这一层后,往 Send Buffer 去传数据的时候就会降
到 1MBs 了,这样的话 Producer 端的发送速率跟 Consumer 端的处理速率就可以
匹配起来了,就不会导致上述问题。但是这个解决方案有两点限制:
● 事先无法预估 Consumer 到底能承受多大的速率;
● Consumer 的承受能力通常会动态地波动。90 > Apache Flink 进阶(七):网络流控及反压剖析
3. 网络流控的实现:动态反馈 自动反压
针对静态限速的问题我们就演进到了动态反馈(自动反压)的机制,我们需要
Consumer 能够及时的给 Producer 做一个 feedback,即告知 Producer 能够承受
的速率是多少。动态反馈分为两种:
● 负反馈:接受速率小于发送速率时发生,告知 Producer 降低发送速率;
● 正反馈:发送速率小于接收速率时发生,告知 Producer 可以把发送速率提
上来。
让我们来看几个经典案例:
案例一:Storm 反压实现Apache Flink 进阶(七):网络流控及反压剖析 < 91
上图就是 Storm 里实现的反压机制,可以看到 Storm 在每一个 Bolt 都会有一
个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队
列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper
会一直被 Spout 监听,监听到有反压的情况就会停止发送,通过这样的方式匹配上
下游的发送接收速率。
案例二:Spark Streaming 反压实现
Spark Streaming 里也有做类似这样的 feedback 机制,上图 Fecher 会实时
的从 Buffer、Processing 这样的节点收集一些指标然后通过 Controller 把速度接收
的情况再反馈到 Receiver,实现速率的匹配。
疑问:为什么 Flink(before V1.5)里没有用类似的方式实现 feedback 机制?
首先在解决这个疑问之前我们需要先了解一下 Flink 的网络传输是一个什么样的
架构。92 > Apache Flink 进阶(七):网络流控及反压剖析
这张图就体现了 Flink 在做网络传输的时候基本的数据的流向,发送端在发送网
络数据前要经历自己内部的一个流程,会有一个自己的 Network Buffer,在底层用
Netty 去做通信,Netty 这一层又有属于自己的 ChannelOutbound Buffer,因为最
终是要通过 Socket 做网络请求的发送,所以在 Socket 也有自己的 Send Buffer,同样在接收端也有对应的三级 Buffer。学过计算机网络的时候我们应该了解到,TCP 是自带流量控制的。实际上 Flink (before V1.5)就是通过 TCP 的流控机制来
实现 feedback 的。
TCP 流控机制
根据下图我们来简单的回顾一下 TCP 包的格式结构。首先,他有 Sequence
number 这样一个机制给每个数据包做一个编号,还有 ACK number 这样一
个机制来确保 TCP 的数据传输是可靠的,除此之外还有一个很重要的部分就是
Window Size,接收端在回复消息的时候会通过 Window Size 告诉发送端还可以
发送多少数据。Apache Flink 进阶(七):网络流控及反压剖析 < 93
接下来我们来简单看一下这个过程。
TCP 流控:滑动窗口
TCP 的流控就是基于滑动窗口的机制,现在我们有一个 Socket 的发送端和一
个 Socket 的接收端,目前我们的发送端的速率是我们接收端的 3 倍,这样会发生什94 > Apache Flink 进阶(七):网络流控及反压剖析
么样的一个情况呢?假定初始的时候我们发送的 window 大小是 3,然后我们接收端
的 window 大小是固定的,就是接收端的 Buffer 大小为 5。
首先,发送端会一次性发 3 个 packets,将 1,2,3 发送给接收端,接收端接
收到后会将这 3 个 packets 放到 Buffer 里去。
接收端一次消费 1 个 packet,这时候 1 就已经被消费了,然后我们看到接收Apache Flink 进阶(七):网络流控及反压剖析 < 95
端的滑动窗口会往前滑动一格,这时候 2,3 还在 Buffer 当中 而 4,5,6 是空出来
的,所以接收端会给发送端发送 ACK = 4 ,代表发送端可以从 4 开始发送,同时会
将 window 设置为 3 (Buffer 的大小 5 减去已经存下的 2 和 3),发送端接收到回应
后也会将他的滑动窗口向前移动到 4,5,6。
这时候发送端将 4,5,6 发送,接收端也能成功的接收到 Buffer 中去。96 > Apache Flink 进阶(七):网络流控及反压剖析
到这一阶段后,接收端就消费到 2 了,同样他的窗口也会向前滑动一个,这时候
他的 Buffer 就只剩一个了,于是向发送端发送 ACK = 7、window = 1。发送端收到
之后滑动窗口也向前移,但是这个时候就不能移动 3 格了,虽然发送端的速度允许发
3 个 packets 但是 window 传值已经告知只能接收一个,所以他的滑动窗口就只能
往前移一格到 7 ,这样就达到了限流的效果,发送端的发送速度从 3 降到 1。
我们再看一下这种情况,这时候发送端将 7 发送后,接收端接收到,但是由于接
收端的消费出现问题,一直没有从 Buffer 中去取,这时候接收端向发送端发送 ACK Apache Flink 进阶(七):网络流控及反压剖析 < 97
= 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就
会使发送端的发送速度降为 0。这个时候发送端不发送任何数据了,接收端也不进行
任何的反馈了,那么如何知道消费端又开始消费了呢?98 > Apache Flink 进阶(七):网络流控及反压剖析
TCP 当中有一个 ZeroWindowProbe 的机制,发送端会定期的发送 1 个字节的
探测消息,这时候接收端就会把 window 的大小进行反馈。当接收端的消费恢复了之
后,接收到探测消息就可以将 window 反馈给发送端端了从而恢复整个流程。TCP
就是通过这样一个滑动窗口的机制实现 feedback。
Flink TCP-based反压机制(before V1.5)
1. 示例:WindowWordCount
大体的逻辑就是从 Socket 里去接收数据,每 5s 去进行一次 WordCount,将
这个代码提交后就进入到了编译阶段。
2. 编译阶段:生成 JobGraphApache Flink 进阶(七):网络流控及反压剖析 < 99
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 Job-
Graph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候
会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向
集群进行提交,进入运行阶段。
3. 运行阶段:调度 ExecutionGraph
JobGraph 提交到集群后会生成 ExecutionGraph ,这时候就已经具备基本的
执行任务的雏形了,把每个任务拆解成了不同的 SubTask,上图 ExecutionGraph
中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 Execu-
tionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。
然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通
过这样一个 InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPar-
tition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就
形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看100 > Apache Flink 进阶(七):网络流控及反压剖析
的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
4. 问题拆解:反压传播两个阶段
反压的传播实际上是分为两个阶段的,对应着上面的执行图,我们一共涉及 3 个
TaskManager,在每个 TaskManager 里面都有相应的 Task 在执行,还有负责接
收数据的 InputGate,发送数据的 ResultPartition,这就是一个最基本的数据传输
的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候
是如何将这个压力反向传播回去呢?这时候就分为两种情况:
● 跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition。
● TaskManager 内,反压如何从 ResultPartition 传播到 InputGate。Apache Flink 进阶(七):网络流控及反压剖析 < 101
5. 跨 TaskManager 数据传输
前面提到,发送数据需要 ResultPartition,在每个 ResultPartition 里面会
有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。对于一个
TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在
初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同
步 Network BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Net-
work BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进
来,因为 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向
Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求
转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSub-
Partition,<1,2> 这个两个数据就可以被写入了。
之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝
到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消
费掉。接下来我们来模拟上下游处理速度不匹配的场景,发送端的速率为 2,接收端
的速率为 1,看一下反压的过程是怎样的。102 > Apache Flink 进阶(七):网络流控及反压剖析
6. 跨 TaskManager 反压过程
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他
会向 Local BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的
一个 Buffer 就会被标记为 Used。
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向
Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能
向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的
Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Apache Flink 进阶(七):网络流控及反压剖析 < 103
Network BufferPool 还是有可用的 Buffer 可以向其申请。
一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local Buffer-
Pool 的最大可用 Buffer 到了上限无法向 Network BufferPool 申请,没有办法去读
取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer
中读取数据了。
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给
发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。104 > Apache Flink 进阶(七):网络流控及反压剖析
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后
就会停止向 Socket 写数据。
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是
Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控
制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向
Netty 写数据。Apache Flink 进阶(七):网络流控及反压剖析 < 105
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向
Local BufferPool 和 Network Buffer ......
实时计算交流钉钉群 Flink社区微信公众号
扫一扫二维码图案,关注我吧Apache Flink 进阶(一):Runtime 核心机制剖析 4
Apache Flink 进阶(二):时间属性深度解析 18
Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 30
Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 41
Apache Flink 进阶(五):数据类型和序列化 60
Apache Flink 进阶(六):Flink 作业执行深度解析 71
Apache Flink 进阶(七):网络流控及反压剖析 88
Apache Flink 进阶(八):详解 Metrics 原理与实战 112
Apache Flink 进阶(九):Flink Connector 开发 125
Apache Flink 进阶(十):Flink State 最佳实践 141
Apache Flink 进阶(十一):TensorFlow On Flink 149
Apache Flink 进阶(十二):深度探索 Flink SQL 159
Apache Flink 进阶(十三):Python API 应用实践 181
目录Apache Flink 进阶(一):Runtime 核心机制剖析
作者:高赟(云骞)
阿里巴巴技术专家
简介:Flink 的整体架构如图 1 所示。Flink 是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运
行在 Yarn 或者 K8S 这种资源管理系统上面,也可以在各种云环境中执行。
1. 综述
本文主要介绍 Flink Runtime 的作业执行的核心机制。首先介绍 Flink Runtime
的整体架构以及 Job 的基本执行流程,然后介绍在这个过程,Flink 是怎么进行资源
管理、作业调度以及错误恢复的。最后,本文还将简要介绍 Flink Runtime 层当前正
在进行的一些工作。
2. Flink Runtime 整体架构
Flink 的整体架构如图 1 所示。Flink 是可以运行在多种不同的环境中的,例如,它可以通过单进程多线程的方式直接运行,从而提供调试的能力。它也可以运行在
Yarn 或者 K8S 这种资源管理系统上面,也可以在各种云环境中执行。Apache Flink 进阶(一):Runtime 核心机制剖析 < 5
图1 Flink 的整体架构,其中 Runtime 层对不同的执行环境提供了一套统一的分布式执行引擎
针对不同的执行环境,Flink 提供了一套统一的分布式作业执行引擎,也就是
Flink Runtime 这层。Flink 在 Runtime 层之上提供了 DataStream 和 DataSet 两
套 API,分别用来编写流作业与批作业,以及一组更高级的 API 来简化特定作业的
编写。本文主要介绍 Flink Runtime 层的整体架构。
Flink Runtime 层的主要架构如图 2 所示,它展示了一个 Flink 集群的基本结
构。Flink Runtime 层的整个架构主要是在 FLIP-6 中实现的,整体来说,它采用了
标准 master-slave 的结构,其中左侧白色圈中的部分即是 master,它负责管理整
个集群中的资源和作业;而右侧的两个 TaskExecutor 则是 Slave,负责提供具体的
资源并实际执行作业。6 > Apache Flink 进阶(一):Runtime 核心机制剖析
图2 Flink 集群的基本结构。Flink Runtime 层采用了标准的 master-slave 架构
其中,Master 部分又包含了三个组件,即 Dispatcher、ResourceManager
和 JobManager。其中,Dispatcher 负责接收用户提供的作业,并且负责为这个
新提交的作业拉起一个新的 JobManager 组件。ResourceManager 负责资源的
管理,在整个 Flink 集群中只有一个 ResourceManager。JobManager 负责管理
作业的执行,在一个 Flink 集群中可能有多个作业同时执行,每个作业都有自己的
JobManager 组件。这三个组件都包含在 AppMaster 进程中。
基于上述结构,当用户提交作业的时候,提交脚本会首先启动一个 Client进程
负责作业的编译与提交。它首先将用户编写的代码编译为一个 JobGraph,在这个过
程,它还会进行一些检查或优化等工作,例如判断哪些 Operator 可以 Chain 到同一
个 Task 中。然后,Client 将产生的 JobGraph 提交到集群中执行。此时有两种情
况,一种是类似于 Standalone 这种 Session 模式,AM 会预先启动,此时 Client
直接与 Dispatcher 建立连接并提交作业即可。另一种是 Per-Job 模式,AM 不会
预先启动,此时 Client 将首先向资源管理系统 (如Yarn、K8S)申请资源来启动
AM,然后再向 AM 中的 Dispatcher 提交作业。
当作业到 Dispatcher 后,Dispatcher 会首先启动一个 JobManager 组件,然
后 JobManager 会向 ResourceManager 申请资源来启动作业中具体的任务。这Apache Flink 进阶(一):Runtime 核心机制剖析 < 7
时根据 Session 和 Per-Job 模式的区别, TaskExecutor 可能已经启动或者尚未启
动。如果是前者,此时 ResourceManager 中已有记录了 TaskExecutor 注册的资
源,可以直接选取空闲资源进行分配。否则,ResourceManager 也需要首先向外
部资源管理系统申请资源来启动 TaskExecutor,然后等待 TaskExecutor 注册相
应资源后再继续选择空闲资源进程分配。目前 Flink 中 TaskExecutor 的资源是通过
Slot 来描述的,一个 Slot 一般可以执行一个具体的 Task,但在一些情况下也可以执
行多个相关联的 Task,这部分内容将在下文进行详述。ResourceManager 选择到
空闲的 Slot 之后,就会通知相应的 TM “将该 Slot 分配分 JobManager XX ”,然
后 TaskExecutor 进行相应的记录后,会向 JobManager 进行注册。JobManager
收到 TaskExecutor 注册上来的 Slot 后,就可以实际提交 Task 了。
TaskExecutor 收到 JobManager 提交的 Task 之后,会启动一个新的线程来
执行该 Task。Task 启动后就会开始进行预先指定的计算,并通过数据 Shuffle 模块
互相交换数据。
以上就是 Flink Runtime 层执行作业的基本流程。可以看出,Flink 支持两种不
同的模式,即 Per-job 模式与 Session 模式。如图 3 所示,Per-job 模式下整个
Flink 集群只执行单个作业,即每个作业会独享 Dispatcher 和 ResourceManager
组件。此外,Per-job 模式下 AppMaster 和 TaskExecutor 都是按需申请的。因
此,Per-job 模式更适合运行执行时间较长的大作业,这些作业对稳定性要求较
高,并且对申请资源的时间不敏感。与之对应,在 Session 模式下,Flink 预先启动
AppMaster 以及一组 TaskExecutor,然后在整个集群的生命周期中会执行多个作
业。可以看出,Session 模式更适合规模小,执行时间短的作业。8 > Apache Flink 进阶(一):Runtime 核心机制剖析
图3 Flink Runtime 支持两种作业执行的模式
3. 资源管理与作业调度
本节对 Flink 中资源管理与作业调度的功能进行更深入的说明。实际上,作业调
度可以看做是对资源和任务进行匹配的过程。如上节所述,在 Flink 中,资源是通过
Slot 来表示的,每个 Slot 可以用来执行不同的 Task。而在另一端,任务即 Job 中
实际的 Task,它包含了待执行的用户逻辑。调度的主要目的就是为了给 Task 找到
匹配的 Slot。逻辑上来说,每个 Slot 都应该有一个向量来描述它所能提供的各种资
源的量,每个 Task 也需要相应的说明它所需要的各种资源的量。但是实际上在 1.9
之前,Flink 是不支持细粒度的资源描述的,而是统一的认为每个 Slot 提供的资源和
Task 需要的资源都是相同的。从 1.9 开始,Flink 开始增加对细粒度的资源匹配的支
持的实现,但这部分功能目前仍在完善中。
作业调度的基础是首先提供对资源的管理,因此我们首先来看下 Flink 中资源
管理的实现。如上文所述,Flink 中的资源是由 TaskExecutor 上的 Slot 来表示
的。如图 4 所示,在 ResourceManager 中,有一个子组件叫做 SlotManager,它
维护了当前集群中所有 TaskExecutor 上的 Slot 的信息与状态,如该 Slot 在哪个
TaskExecutor 中,该 Slot 当前是否空闲等。当 JobManger 来为特定 Task 申请
资源的时候,根据当前是 Per-job 还是 Session 模式,ResourceManager 可能
会去申请资源来启动新的 TaskExecutor。当 TaskExecutor 启动之后,它会通过
服务发现找到当前活跃的 ResourceManager 并进行注册。在注册信息中,会包含Apache Flink 进阶(一):Runtime 核心机制剖析 < 9
该 TaskExecutor中所有 Slot 的信息。 ResourceManager 收到注册信息后,其中
的 SlotManager 就会记录下相应的 Slot 信息。当 JobManager 为某个 Task 来申
请资源时, SlotManager 就会从当前空闲的 Slot 中按一定规则选择一个空闲的 Slot
进行分配。当分配完成后,如第 2 节所述,RM 会首先向 TaskManager 发送 RPC
要求将选定的 Slot 分配给特定的 JobManager。TaskManager 如果还没有执行过
该 JobManager 的 Task 的话,它需要首先向相应的 JobManager 建立连接,然
后发送提供 Slot 的 RPC 请求。在 JobManager 中,所有 Task 的请求会缓存到
SlotPool 中。当有 Slot 被提供之后,SlotPool 会从缓存的请求中选择相应的请求并
结束相应的请求过程。
图4 Flink 中资源管理功能各模块交互关系
当 Task 结束之后,无论是正常结束还是异常结束,都会通知 JobManager 相
应的结束状态,然后在 TaskManager 端将 Slot 标记为已占用但未执行任务的状
态。JobManager 会首先将相应的 Slot 缓存到 SlotPool 中,但不会立即释放。这
种方式避免了如果将 Slot 直接还给 ResourceManager,在任务异常结束之后需10 > Apache Flink 进阶(一):Runtime 核心机制剖析
要重启时,需要立刻重新申请 Slot 的问题。通过延时释放,Failover 的 Task 可以
尽快调度回原来的 TaskManager,从而加快 Failover 的速度。当 SlotPool 中缓
存的 Slot 超过指定的时间仍未使用时,SlotPool 就会发起释放该 Slot 的过程。与
申请 Slot 的过程对应,SlotPool 会首先通知 TaskManager 来释放该 Slot,然后
TaskExecutor 通知 ResourceManager 该 Slot 已经被释放,从而最终完成释放的
逻辑。
除了正常的通信逻辑外,在 ResourceManager 和 TaskExecutor 之间还存在
定时的心跳消息来同步 Slot 的状态。在分布式系统中,消息的丢失、错乱不可避免,这些问题会在分布式系统的组件中引入不一致状态,如果没有定时消息,那么组件无
法从这些不一致状态中恢复。此外,当组件之间长时间未收到对方的心跳时,就会认
为对应的组件已经失效,并进入到 Failover 的流程。
在 Slot 管理基础上,Flink 可以将 Task 调度到相应的 Slot 当中。如上文所
述,Flink 尚未完全引入细粒度的资源匹配,默认情况下,每个 Slot 可以分配给一个
Task。但是,这种方式在某些情况下会导致资源利用率不高。如图 5 所示,假如 A、B、C 依次执行计算逻辑,那么给 A、B、C 分配分配单独的 Slot 就会导致资源利用
率不高。为了解决这一问题,Flink 提供了 Share Slot 的机制。如图 5 所示,基于
Share Slot,每个 Slot 中可以部署来自不同 JobVertex 的多个任务,但是不能部署
来自同一个 JobVertex 的 Task。如图 5所示,每个 Slot 中最多可以部署同一个 A、B 或 C 的 Task,但是可以同时部署 A、B 和 C 的各一个 Task。当单个 Task 占用
资源较少时,Share Slot 可以提高资源利用率。 此外,Share Slot 也提供了一种简
单的保持负载均衡的方式。Apache Flink 进阶(一):Runtime 核心机制剖析 < 11
图5 Flink Share Slot 示例。使用 Share Slot 可以在每个 Slot 中部署来自不同 JobVertex 的多个 Task
基于上述 Slot 管理和分配的逻辑,JobManager 负责维护作业中 Task 执
行的状态。如上文所述,Client 端会向 JobManager 提交一个 JobGraph,它
代表了作业的逻辑结构。JobManager 会根据 JobGraph 按并发展开,从而得到
JobManager 中关键的 ExecutionGraph。ExecutionGraph 的结构如图 5 所示,与 JobGraph 相比,ExecutionGraph 中对于每个 Task 与中间结果等均创建了对
应的对象,从而可以维护这些实体的信息与状态。12 > Apache Flink 进阶(一):Runtime 核心机制剖析
图6 Flink 中的 JobGraph 与 ExecutionGraph。ExecutionGraph 是 JobGraph 按并发展开所形
成的,它是 JobMaster 中的核心数据结构
在一个 Flink Job 中是包含多个 Task 的,因此另一个关键的问题是在 Flink 中
按什么顺序来调度 Task。如图 7 所示,目前 Flink 提供了两种基本的调度逻辑,即
Eager 调度与 Lazy From Source。Eager 调度如其名子所示,它会在作业启动时
申请资源将所有的 Task 调度起来。这种调度算法主要用来调度可能没有终止的流作
业。与之对应,Lazy From Source 则是从 Source 开始,按拓扑顺序来进行调度。
简单来说,Lazy From Source 会先调度没有上游任务的 Source 任务,当这些任
务执行完成时,它会将输出数据缓存到内存或者写入到磁盘中。然后,对于后续的任
务,当它的前驱任务全部执行完成后,Flink 就会将这些任务调度起来。这些任务会
从读取上游缓存的输出数据进行自己的计算。这一过程继续进行直到所有的任务完成
计算。Apache Flink 进阶(一):Runtime 核心机制剖析 < 13
图7 Flink 中两种基本的调度策略。其中 Eager 调度适用于流作业,而 Lazy From Source 适用
于批作业
4. 错误恢复
在 Flink 作业的执行过程中,除正常执行的流程外,还有可能由于环境等原因导
致各种类型的错误。整体上来说,错误可能分为两大类:Task 执行出现错误或 Flink
集群的 Master 出现错误。由于错误不可避免,为了提高可用性,Flink 需要提供自
动错误恢复机制来进行重试。
对于第一类 Task 执行错误,Flink 提供了多种不同的错误恢复策略。如图 8
所示,第一种策略是 Restart-all,即直接重启所有的 Task。对于 Flink 的流任
务,由于 Flink 提供了 Checkpoint 机制,因此当任务重启后可以直接从上次的
Checkpoint 开始继续执行。因此这种方式更适合于流作业。第二类错误恢复策略是
Restart-individual,它只适用于 Task 之间没有数据传输的情况。这种情况下,我
们可以直接重启出错的任务。14 > Apache Flink 进阶(一):Runtime 核心机制剖析
图8 Restart-all 错误恢复策略示例。该策略会直接重启所有的 Task
图9 Restart-individual 错误恢复策略示例。该策略只适用于 Task之间不需要数据传输的作业,对于这种作业可以只重启出现错误的 TaskApache Flink 进阶(一):Runtime 核心机制剖析 < 15
由于 Flink 的批作业没有 Checkpoint 机制,因此对于需要数据传输的作业,直接重启所有 Task 会导致作业从头计算,从而导致一定的性能问题。为了增强对
Batch 作业,Flink 在 1.9 中引入了一种新的 Region-Based 的 Failover 策略。在
一个 Flink 的 Batch 作业中 Task 之间存在两种数据传输方式,一种是 Pipeline 类
型的方式,这种方式上下游 Task 之间直接通过网络传输数据,因此需要上下游同
时运行;另外一种是 Blocking 类型的试,如上节所述,这种方式下,上游的 Task
会首先将数据进行缓存,因此上下游的 Task 可以单独执行。基于这两种类型的传
输,Flink 将 ExecutionGraph 中使用 Pipeline 方式传输数据的 Task 的子图叫做
Region,从而将整个 ExecutionGraph 划分为多个子图。可以看出,Region 内的
Task 必须同时重启,而不同 Region 的 Task 由于在 Region 边界存在 Blocking 的
边,因此,可以单独重启下游 Region 中的 Task。
基于这一思路,如果某个 Region 中的某个 Task 执行出现错误,可以分两种
情况进行考虑。如图 8 所示,如果是由于 Task 本身的问题发生错误,那么可以只
重启该 Task 所属的 Region 中的 Task,这些 Task 重启之后,可以直接拉取上游
Region 缓存的输出结果继续进行计算。
另一方面,如图如果错误是由于读取上游结果出现问题,如网络连接中断、缓存
上游输出数据的 TaskExecutor 异常退出等,那么还需要重启上游 Region 来重新
产生相应的数据。在这种情况下,如果上游 Region 输出的数据分发方式不是确定性
的(如 KeyBy、Broadcast 是确定性的分发方式,而 Rebalance、Random 则不
是,因为每次执行会产生不同的分发结果),为保证结果正确性,还需要同时重启上
游 Region 所有的下游 Region。16 > Apache Flink 进阶(一):Runtime 核心机制剖析
图 10 Region-based 错误恢复策略示例一。如果是由于下游任务本身导致的错误,可以只重启下
游对应的 Region
图 11 Region-based 错误恢复策略示例二。如果是由于上游失败导致的错误,那么需要同时重启
上游的 Region 和下游的 Region。实际上,如果下游的输出使用了非确定的数据分割方式,为了保持数据一致性,还需要同时重启所有上游 Region 的下游 Region
除了 Task 本身执行的异常外,另一类异常是 Flink 集群的 Master 进行发生异
常。目前 Flink 支持启动多个 Master 作为备份,这些 Master 可以通过 ZK 来进行
选主,从而保证某一时刻只有一个 Master 在运行。当前活路的 Master 发生异常
时,某个备份的 Master 可以接管协调的工作。为了保证 Master 可以准确维护作业
的状态,Flink 目前采用了一种最简单的实现方式,即直接重启整个作业。实际上,Apache Flink 进阶(一):Runtime 核心机制剖析 < 17
由于作业本身可能仍在正常运行,因此这种方式存在一定的改进空间。
5. 未来展望
Flink目前仍然在Runtime部分进行不断的迭代和更新。目前来看,Flink未来
可能会在以下几个方式继续进行优化和扩展:
● 更完善的资源管理:从 1.9 开始 Flink 开始了对细粒度资源匹配的支持。基于
细粒度的资源匹配,用户可以为 TaskExecutor 和 Task 设置实际提供和使用
的 CPU、内存等资源的数量,Flink 可以按照资源的使用情况进行调度。这一
机制允许用户更大范围的控制作业的调度,从而为进一步提高资源利用率提供
了基础。
● 统一的 Stream 与 Batch:Flink 目前为流和批分别提供了 DataStream 和
DataSet 两套接口,在一些场景下会导致重复实现逻辑的问题。未来 Flink 会
将流和批的接口都统一到 DataStream 之上。
● 更灵活的调度策略:Flink 从 1.9 开始引入调度插件的支持,从而允许用户来
扩展实现自己的调度逻辑。未来 Flink 也会提供更高性能的调度策略的实现。
● Master Failover 的优化:如上节所述,目前 Flink 在 Master Failover 时
需要重启整个作业,而实际上重启作业并不是必须的逻辑。Flink 未来会对
Master failover 进行进一步的优化来避免不必要的作业重启。Apache Flink 进阶(二):时间属性深度解析
作者:崔星灿,Apache Flink Committer
整理:沙晟阳(成阳),阿里巴巴技术专家
本文根据 Apache Flink 进阶篇系列直播课程整理而成,由 Apache Flink
Committer 崔星灿分享,阿里巴巴技术专家沙晟阳(成阳)整理。文章将对 Flink 的
时间属性及原理进行深度解析。
Tips:文末可回顾全部基础篇及进阶篇系列教程。
1. 前言
Flink 的 API 大体上可以划分为三个层次:处于最底层的 ProcessFunction、中
间一层的 DataStream API 和最上层的 SQLTable API,这三层中的每一层都非常
依赖于时间属性。时间属性是流处理中最重要的一个方面,是流处理系统的基石之
一,贯穿这三层 API。在 DataStream API 这一层中因为封装方面的原因,我们能
够接触到时间的地方不是很多,所以我们将重点放在底层的 ProcessFunction 和最
上层的 SQLTable API。Apache Flink 进阶(二):时间属性深度解析 < 19
2. Flink 时间语义
在不同的应用场景中时间语义是各不相同的,Flink 作为一个先进的分布式流处
理引擎,它本身支持不同的时间语义。其核心是 Processing Time 和 Event Time
(Row Time),这两类时间主要的不同点如下表所示:
Processing Time 是来模拟我们真实世界的时间,其实就算是处理数据的节点
本地时间也不一定是完完全全的真实世界的时间,所以说它是用来模拟真实世界的时
间。而 Event Time 是数据世界的时间,即我们要处理的数据流世界里的时间。关
于他们的获取方式,Process Time 是通过直接去调用本地机器的时间,而 Event
Time 则是根据每一条处理记录所携带的时间戳来判定。
这两种时间在 Flink 内部的处理以及用户的实际使用方面,难易程度都是不同
的。相对而言的 Processing Time 处理起来更加的简单,而 Event Time 要更麻
烦一些。而在使用 Processing Time 的时候,我们得到的处理结果(或者说流处理
应用的内部状态)是不确定的。而因为在 Flink 内部对 Event Time 做了各种保障,使用 Event Time 的情况下,无论重放数据多少次,都能得到一个相对确定可重现
的结果。
因此在判断应该使用 Processing Time 还是 Event Time 的时候,可以遵循
一个原则:当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行20 > Apache Flink 进阶(二):时间属性深度解析
重放,是不是希望结果完全相同。如果希望结果完全相同,就只能用 Event Time;
如果接受结果不同,则可以用 Processing Time。Processing Time 的一个常见的
用途是,根据现实时间来统计整个系统的吞吐,比如要计算现实时间一个小时处理了
多少条数据,这种情况只能使用 Processing Time。
2.1 时间的特性
时间的一个重要特性是:时间只能递增,不会来回穿越。 在使用时间的时
候我们要充分利用这个特性。假设我们有这么一些记录,然后我们来分别看一下
Processing Time 还有 Event Time 对于时间的处理。
● 对于 Processing Time,因为我们是使用的是本地节点的时间(假设这个节
点的时钟同步没有问题),我们每一次取到的 Processing Time 肯定都是递增
的,递增就代表着有序,所以说我们相当于拿到的是一个有序的数据流。
● 而在用 Event Time 的时候因为时间是绑定在每一条的记录上的,由于网络延
迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在
一定程度的乱序,比如上图的例子。在 Event Time 场景下,我们把每一个记
录所包含的时间称作 Record Timestamp。如果 Record Timestamp 所得到Apache Flink 进阶(二):时间属性深度解析 < 21
的时间序列存在乱序,我们就需要去处理这种情况。
如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化。简
单地讲,就是把数据按照一定的条数组成一些小批次,但这里的小批次并不是攒够多
少条就要去处理,而是为了对他们进行时间上的划分。经过这种更高层次的离散化之
后,我们会发现最右边方框里的时间就是一定会小于中间方框里的时间,中间框里的
时间也一定会小于最左边方框里的时间。
这个时候我们在整个时间序列里插入一些类似于标志位的特殊的处理数据,这些特殊的处理数据叫做 watermark。一个 watermark 本质上就代表了这个
watermark 所包含的 timestamp 数值,表示以后到来的数据已经再也没有小于或
等于这个时间的了。22 > Apache Flink 进阶(二):时间属性深度解析
3. Timestamp 和 Watermark 行为概览
接下来我们重点看一下 Event Time 里的 Record Timestamp(简写成 time-
stamp)和 watermark 的一些基本信息。绝大多数的分布式流计算引擎对于数据都是
进行了 DAG 图的抽象,它有自己的数据源,有处理算子,还有一些数据汇。数据在
不同的逻辑算子之间进行流动。watermark 和 timestamp 有自己的生命周期,接下
来我会从 watermark 和 timestamp 的产生、他们在不同的节点之间的传播、以及在
每一个节点上的处理,这三个方面来展开介绍。
3.1 Timestamp 分配和 Watermark 生成
Flink 支持两种 watermark 生成方式。第一种是在 SourceFunction 中产生,相当于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头。
我们可以在 SourceFunction 里面通过这两个方法产生 watermark:
● 通过 collectWithTimestamp 方法发送一条数据,其中第一个参数就是我
们要发送的数据,第二个参数就是这个数据所对应的时间戳;也可以调用
emitWatermark 去产生一条 watermark,表示接下来不会再有时间戳小于等
于这个数值记录。
● 另 外, 有 时 候 我 们 不 想 在 SourceFunction 里 生 成 timestamp 或 者 Apache Flink 进阶(二):时间属性深度解析 < 23
watermark,或者说使用的 SourceFunction 本身不支持,我们还可以
在 使 用 DataStream API 的 时 候 指 定, 调 用 的 DataStream.assign-
TimestampsAndWatermarks 这个方法,能够接收不同的 timestamp 和
watermark 的生成器。
总体上而言生成器可以分为两类:第一类是定期生成器;第二类是根据一些在流
处理数据流中遇到的一些特殊记录生成的。
两者的区别主要有三个方面,首先定期生成是现实时间驱动的,这里的“定期
生成”主要是指 watermark(因为 timestamp 是每一条数据都需要有的),即定期
会调用生成逻辑去产生一个 watermark。而根据特殊记录生成是数据驱动的,即是
否生成 watermark 不是由现实时间来决定,而是当看到一些特殊的记录就表示接下
来可能不会有符合条件的数据再发过来了,这个时候相当于每一次分配 Timestamp
之后都会调用用户实现的 watermark 生成方法,用户需要在生成方法中去实现
watermark 的生成逻辑。
大家要注意的是就是我们在分配 timestamp 和生成 watermark 的过程,虽然
在 SourceFunction 和 DataStream 中都可以指定,但是还是建议生成的工作越靠
近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据
是否乱序。Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被
正确地传递到下游的节点。24 > Apache Flink 进阶(二):时间属性深度解析
3.2 Watermark 传播
具体的传播策略基本上遵循这三点。
● 首先,watermark 会以广播的形式在算子之间进行传播。比如说上游的算子
连接了三个下游的任务,它会把自己当前的收到的 watermark 以广播的形式
传到下游。
● 第二,如果在程序里面收到了一个 Long.MAX_VALUE 这个数值的 water-
mark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就
是一个终止的标志。
● 第三,对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark 的计算就有讲究了,一个原则是:单输入取其大,多输入取小。
举个例子,假设这边蓝色的块代表一个算子的一个任务,然后它有三个输入,分
别是 W1、W2、W3,这三个输入可以理解成任何输入,这三个输入可能是属于同一
个流,也可能是属于不同的流。然后在计算 watermark 的时候,对于单个输入而言
是取他们的最大值,因为我们都知道 watermark 应该遵循一个单调递增的一个原则。
对于多输入,它要统计整个算子任务的 watermark 时,就会取这三个计算出来的
watermark 的最小值。即一个多个输入的任务,它的 watermark 受制于最慢的那条
输入流。Apache Flink 进阶(二):时间属性深度解析 < 25
这一点类似于木桶效应,整个木桶中装的水会受制于最矮的那块板。
watermark 在传播的时候有一个特点是,它的传播是幂等的。多次收到相同的
watermark,甚至收到之前的 watermark 都不会对最后的数值产生影响,因为对于
单个输入永远是取最大的,而对于整个任务永远是取一个最小的。
同时我们可以注意到这种设计其实有一个局限,具体体现在它没有区分你这个输
入是一条流多个 partition 还是来自于不同的逻辑上的流的 JOIN。对于同一个流的不
同 partition,我们对他做这种强制的时钟同步是没有问题的,因为一开始就把一条流
拆散成不同的部分,但每一个部分之间共享相同的时钟。
但是如果算子的任务是在做类似于 JOIN 操作,那么要求两个输入的时钟强制同
步其实没有什么道理的,因为完全有可能是把一条离现在时间很近的数据流和一个离
当前时间很远的数据流进行 JOIN,这个时候对于快的那条流,因为它要等慢的那条
流,所以说它可能就要在状态中去缓存非常多的数据,这对于整个集群来说是一个很
大的性能开销。
3.3 ProcessFunction
在正式介绍 watermark 的处理之前,先简单介绍 ProcessFunction,因为
watermark 在任务里的处理逻辑分为内部逻辑和外部逻辑。外部逻辑其实就是通过
ProcessFunction 来体现的,如果你需要使用 Flink 提供的时间相关的 API 的话就
只能写在 ProcessFunction 里。
ProcessFunction 和时间相关的功能主要有三点:
● 第一点,根据你当前系统使用的时间语义不同,你可以去获取当前你正在处理
这条记录的 Record Timestamp,或者当前的 Processing Time。
● 第二点,它可以获取当前算子的时间,可以把它理解成当前的 watermark。
● 第三点,为了在 ProcessFunction 中去实现一些相对复杂的功能,允许注
册一些 timer(定时器)。比如说在 watermark 达到某一个时间点的时候就触26 > Apache Flink 进阶(二):时间属性深度解析
发定时器,所有的这些回调逻辑也都是由用户来提供,涉及到如下三个方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。
在 onTimer 方法中就需要去实现自己的回调逻辑,当条件满足时回调逻辑就
会被触发。
一个简单的应用是,我们在做一些时间相关的处理的时候,可能需要缓存一部
分数据,但这些数据不能一直去缓存下去,所以需要有一些过期的机制,我们可以通
过 timer 去设定这么一个时间,指定某一些数据可能在将来的某一个时间点过期,从
而把它从状态里删除掉。所有的这些和时间相关的逻辑在 Flink 内部都是由自己的
Time Service(时间服务)完成的。
3.4 Watermark 处理
一个算子的实例在收到 watermark 的时候,首先要更新当前的算子时间,这样
的话在 ProcessFunction 里方法查询这个算子时间的时候,就能获取到最新的时间。
第二步它会遍历计时器队列,这个计时器队列就是我们刚刚说到的 timer,你可以同
时注册很多 timer,Flink 会把这些 Timer 按照触发时间放到一个优先队列中。第三
步 Flink 得到一个时间之后就会遍历计时器的队列,然后逐一触发用户的回调逻辑。
通过这种方式,Flink 的某一个任务就会将当前的 watermark 发送到下游的其他任务
实例上,从而完成整个 watermark 的传播,从而形成一个闭环。Apache Flink 进阶(二):时间属性深度解析 < 27
4. Table API 中的时间
下面我们来看一看 TableSQL API 中的时间。为了让时间参与到 TableSQL
这一层的运算中,我们需要提前把时间属性放到表的 schema 中,这样的话我们才
能够在 SQL 语句或者 Table 的逻辑表达式里面使用时间去完成需求。
4.1 Table 中指定时间列
其实之前社区就怎么在 TableSQL 中去使用时间这个问题做过一定的讨论,是
把获取当前 Processing Time 的方法是作为一个特殊的 UDF,还是把这一个列物
化到整个的 schema 里面,最终采用了后者。我们这里就分开来讲一讲 Processing
Time 和 Event Time 在使用的时候怎么在 Table 中指定。
对于 Processing Time,我们知道要得到一个 Table 对象(或者注册一个
Table)有两种手段:
● 可以从一个 DataStream 转化成一个 Table;
● 直接通过 TableSource 去生成这么一个 Table;
对于第一种方法而言,我们只需要在你已有的这些列中(例子中 f1 和 f2 就是两
个已有的列),在最后用“列名.proctime”这种写法就可以把最后的这一列注册为一
个 Processing Time,以后在写查询的时候就可以去直接使用这一列。如果 Table
是通过 TableSource 生成的,就可以通过实现这一个 DefinedRowtimeAttributes 28 > Apache Flink 进阶(二):时间属性深度解析
接口,然后就会自动根据你提供的逻辑去生成对应的 Processing Time。
相对而言,在使用 Event Time 时则有一个限制,因为 Event Time 不像
Processing Time 那样是随拿随用。如果要从 DataStream 去转化得到一个 Table,必须要提前保证原始的 DataStream 里面已经存在了 Record Timestamp 和
watermark。如果想通过 TableSource 生成的,也一定要保证要接入的数据里面存
在一个类型为 long 或者 timestamp 的时间字段。
具体来说,如果要从 DataStream 去注册一个表,和 proctime 类似,只需要加
上“列名 .rowtime”就可以。需要注意的是,如果要用 Processing Time,必须保
证要新加的字段是整个 schema 中的最后一个字段,而 Event Time 的时候其实可
以去替换某一个已有的列,然后 Flink 会自动的把这一列转化成需要的 rowtime 这个
类型。
如果是通过 TableSource 生成的,只需要实现 DefinedRowtimeAttributes
接口就可以了。需要说明的一点是,在 DataStream API 这一侧其实不支持同时
存在多个 Event Time(rowtime),但是在 Table 这一层理论上可以同时存在多个
rowtime。因为 DefinedRowtimeAttributes 接口的返回值是一个对于 rowtime 描述
的 List,即其实可以同时存在多个 rowtime 列,在将来可能会进行一些其他的改进,或者基于去做一些相应的优化。
4.2 时间列和 Table 操作Apache Flink 进阶(二):时间属性深度解析 < 29
指定完了时间列之后,当我们要真正去查询时就会涉及到一些具体的操作。这里
我列举的这些操作都是和时间列紧密相关,或者说必须在这个时间列上才能进行的。
比如说“Over 窗口聚合”和“Group by 窗口聚合”这两种窗口聚合,在写 SQL 提
供参数的时候只能允许你在这个时间列上进行这种聚合。第三个就是时间窗口聚合,你在写条件的时候只支持对应的时间列。最后就是排序,我们知道在一个无尽的数据
流上对数据做排序几乎是不可能的事情,但因为这个数据本身到来的顺序已经是按照
时间属性来进行排序,所以说如果要对一个 DataStream 转化成 Table 进行排序
的话,只能是按照时间列进行排序,当然同时也可以指定一些其他的列,但是时间列
这个是必须的,并且必须放在第一位。
为什么说这些操作只能在时间列上进行?
因为我们有的时候可以把到来的数据流就看成是一张按照时间排列好的一张表,而我们任何对于表的操作,其实都是必须在对它进行一次顺序扫描的前提下完成的。
大家都知道数据流的特性之一就是一过性,某一条数据处理过去之后,将来其实不太
好去访问它。当然因为 Flink 中内部提供了一些状态机制,我们可以在一定程度上去
弱化这个特性,但是最终还是不能超越的,限制状态不能太大。所有这些操作为什么
只能在时间列上进行,因为这个时间列能够保证我们内部产生的状态不会无限的增长
下去,这是一个最终的前提。Apache Flink 进阶(三):
Checkpoint 原理剖析与应用实践
作者:唐云 ( 茶干 )
阿里巴巴高级研发工程师
大家好,今天我将跟大家分享一下 Flink 里面的 Checkpoint,共分为四个部分。
首先讲一下 Checkpoint 与 state 的关系,然后介绍什么是 state,第三部分介绍如
何在 Flink 中使用 state,第四部分则介绍 Checkpoint 的执行机制。
Checkpoint 与 state 的关系
Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作。下图
可以有一个对 Checkpoint 的直观感受,红框里面可以看到一共触发了 569K 次
Checkpoint,然后全部都成功完成,没有 fail 的。
state 其实就是 Checkpoint 所做的主要持久化备份的主要数据,看下图的具
体数据统计,其 state 也就 9kb 大小 。Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 31
什么是 state
我们接下来看什么是 state。先看一个非常经典的 word count 代码,这段代码
会去监控本地的 9000 端口的数据并对网络端口输入进行词频统计,我们本地行动
netcat,然后在终端输入 hello world,执行程序会输出什么?
答案很明显,(hello, 1) 和 (word,1)
那么问题来了,如果再次在终端输入 hello world,程序会输入什么?
答案其实也很明显,(hello, 2) 和 (world, 2)。为什么 Flink 知道之前已
经处理过一次 hello world,这就是 state 发挥作用了,这里是被称为 keyed state
存储了之前需要统计的数据,所以帮助 Flink 知道 hello 和 world 分别出现过一次。
回顾一下刚才这段 word count 代码。keyby 接口的调用会创建 keyed stream
对 key 进行划分,这是使用 keyed state 的前提。在此之后,sum 方法会调用内置32 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
的 StreamGroupedReduce 实现。
什么是 keyed state
对于 keyed state,有两个特点:
● 只能应用于 KeyedStream 的函数与操作中,例如 Keyed UDF, window
state
● keyed state 是已经分区 划分好的,每一个 key 只能属于某一个 keyed
state
对于如何理解已经分区的概念,我们需要看一下 keyby 的语义,大家可以看到
下图左边有三个并发,右边也是三个并发,左边的词进来之后,通过 keyby 会进行
相应的分发。例如对于 hello word,hello 这个词通过 hash 运算永远只会到右下方
并发的 task 上面去。Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 33
什么是 operator state
● 又称为 non-keyed state,每一个 operator state 都仅与一个 operator 的实
例绑定。
● 常见的 operator state 是 source state,例如记录当前 source 的 offset
再看一段使用 operator state 的 word count 代码:
这里的fromElements 会调用FromElementsFunction的类,其中就使用了34 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
类型为 list state 的 operator state。根据 state 类型做一个分类如下图:
除了从这种分类的角度,还有一种分类的角度是从 Flink 是否直接接管:
● Managed State:由 Flink 管理的 state,刚才举例的所有 state 均是
managed state
● Raw State:Flink 仅提供 stream 可以进行存储数据,对 Flink 而言 raw
state 只是一些 bytes
在实际生产中,都只推荐使用 managed state,本文将围绕该话题进行讨论。
如何在 Flink 中使用 state
下图就前文 word count 的 sum 所使用的 StreamGroupedReduce 类为例讲
解了如何在代码中使用 keyed state:Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 35
下图则对 word count 示例中的 FromElementsFunction 类进行详解并分享
如何在代码中使用 operator state:36 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
Checkpoint 的执行机制
在介绍 Checkpoint 的执行机制前,我们需要了解一下 state 的存储,因为
state 是 Checkpoint 进行持久化备份的主要角色。
Statebackend 的分类
下图阐释了目前 Flink 内置的三类 state backend,其中MemoryStateBackend
和 FsStateBackend 在运行时都是存储在 java heap 中的,只有在执行 Check-
point 时,FsStateBackend 才会将数据以文件格式持久化到远程存储上。而
RocksDBStateBackend则借用了 RocksDB(内存磁盘混合的 LSM DB)对 state
进行存储。
对于 HeapKeyedStateBackend,有两种实现:
● 支持异步 Checkpoint(默认):存储格式 CopyOnWriteStateMap
● 仅支持同步 Checkpoint:存储格式 NestedStateMap
特别在 MemoryStateBackend 内使用HeapKeyedStateBackend时,Check-
point 序列化数据阶段默认有最大 5 MB 数据的限制Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 37
对于 RocksDBKeyedStateBackend,每个 state 都存储在一个单独的 column
family 内,其中 keyGroup,Key 和 Namespace 进行序列化存储在 DB 作为 key。
Checkpoint 执行机制详解
本小节将对 Checkpoint 的执行流程逐步拆解进行讲解,下图左侧是 Check-
point Coordinator,是整个 Checkpoint 的发起者,中间是由两个 source,一个
sink 组成的 Flink 作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。
1. 第一步,Checkpoint Coordinator 向所有 source 节点 trigger Check-
point;。
2. 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamp-
ort 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才
会执行相应的 Checkpoint。38 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
3. 第三步,当 task 完成 state 备份后,会将备份数据的地址(state handle)
通知给 Checkpoint coordinator。
4. 第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本
地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首
先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会
从中选择没有上传的文件进行持久化备份(紫色小三角)。Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 < 39
5. 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返
回通知 Coordinator。
6. 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认
为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Check-
point meta 文件。
Checkpoint 的 EXACTLY_ONCE 语义
为了实现 EXACTLY ONCE 语义,Flink 通过一个 input buffer 将在对齐阶段
收到的数据缓存起来,等对齐完成之后再进行处理。而对于 AT LEAST ONCE 语40 > Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
义,无需缓存收集到的数据,会对后续直接处理,所以导致 restore 时,数据可能会
被多次处理。下图是官网文档里面就 Checkpoint align 的示意图:
需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以
做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。
Savepoint 与 Checkpoint 的区别
作业恢复时,二者均可以使用,主要区别如下:
Savepoint Externalized Checkpoint
用户通过命令触发,由用户管理其创建与删除 Checkpoint 完成时,在用户给定的外部持久化
存储保存
标准化格式存储,允许作业升级或者配置变更 当作业 FAILED(或者 CANCELED)时,外部
存储的 Checkpoint 会保留下来
用户在恢复时需要提供用于恢复作业状态的
savepoint 路径
用户在恢复时需要提供用于恢复的作业状态的
Checkpoint 路径Apache Flink 进阶(四):
Flink on YarnK8s 原理剖析及实践
作者:周凯波(宝牛)
阿里巴巴技术专家
本文根据 Apache Flink 进阶篇系列直播课程整理而成,由阿里巴巴技术专家
周凯波(宝牛)分享,主要介绍 Flink on Yarn K8s 的原理及应用实践,文章将从
Flink 架构、Flink on Yarn 原理及实践、Flink on Kubernetes 原理剖析三部分内容
进行分享并对 Flink on YarnKubernetes 中存在的部分问题进行了解答。
Flink 架构概览
Flink 架构概览 -Job
用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任
务,它会生成一个 JobGraph。JobGraph 是由 source、map、keyBywindow
apply 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。42 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Flink 架构概览 -JobManager
JobManager 的功能主要有:
● 将 JobGraph 转换成 Execution Graph,最终将 Execution Graph 拿来运
行;
● Scheduler 组件负责 Task 的调度;
● Checkpoint Coordinator 组件负责协调整个任务的 Checkpoint,包括
Checkpoint 的开始和完成;
● 通过 Actor System 与 TaskManager 进行通信;
● 其它的一些功能,例如 Recovery Metadata,用于进行故障恢复时,可以从
Metadata 里面读取数据。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 43
Flink 架构概览 -TaskManager
TaskManager 是负责具体任务的执行过程,在 JobManager 申请到资源之后
开始启动。TaskManager 里面的主要组件有:
● Memory IO Manager,即内存 IO 的管理;
● Network Manager,用来对网络方面进行管理;
● Actor system,用来负责网络的通信;
TaskManager 被分成很多个 TaskSlot,每个任务都要运行在一个 TaskSlot
里面,TaskSlot 是调度资源里的最小单位。44 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
在介绍 Yarn 之前先简单的介绍一下 Flink Standalone 模式,这样有助于更好
地了解 Yarn 和 Kubernetes 架构。
● 在 Standalone 模式下,Master 和 TaskManager 可以运行在同一台机器
上,也可以运行在不同的机器上。
● 在 Master 进程中,Standalone ResourceManager 的作用是对资源进行
管理。当用户通过 Flink Cluster Client 将 JobGraph 提交给 Master 时,JobGraph 先经过 Dispatcher。
● 当 Dispatcher 收到客户端的请求之后,生成一个 JobManager。接着
JobManager 进程向 Standalone ResourceManager 申请资源,最终再启
动 TaskManager。
● TaskManager 启动之后,会有一个注册的过程,注册之后 JobManager 再
将具体的 Task 任务分发给这个 TaskManager 去执行。
以上就是一个 Standalone 任务的运行过程。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 45
Flink 运行时相关组件
接下来总结一下 Flink 的基本架构和它在运行时的一些组件,具体如下:
● Client:用户通过 SQL 或者 API 的方式进行任务的提交,提交后会生成一个
JobGraph。
● JobManager:JobManager 接受到用户的请求之后,会对任务进行调度,并且申请资源启动 TaskManager。
● TaskManager: 它 负 责 一 个 具 体 Task 的 执 行。TaskManager 向
JobManager 进行注册,当 TaskManager 接收到 JobManager 分配的任务
之后,开始执行具体的任务。
Flink on Yarn 原理及实践
Yarn 架构原理 -总览
Yarn 模式在国内使用比较广泛,基本上大多数公司在生产环境中都使用过 Yarn
模式。首先介绍一下 Yarn 的架构原理,因为只有足够了解 Yarn 的架构原理,才能
更好的知道 Flink 是如何在 Yarn 上运行的。46 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Yarn 的架构原理如上图所示,最重要的角色是 ResourceManager,主要用来
负责整个资源的管理,Client 端是负责向 ResourceManager 提交任务。
用户在 Client 端提交任务后会先给到 Resource Manager。Resource Man-
ager 会启动 Container,接着进一步启动 Application Master,即对 Master 节点
的启动。当 Master 节点启动之后,会向 Resource Manager 再重新申请资源,当
Resource Manager 将资源分配给 Application Master 之后,Application Master
再将具体的 Task 调度起来去执行。
Yarn 架构原理 -组件
Yarn 集群中的组件包括:
● ResourceManager (RM):ResourceManager (RM) 负责处理客户端请
求、启动 监控 ApplicationMaster、监控 NodeManager、资源的分配与调
度,包含 Scheduler 和 Applications Manager。
● ApplicationMaster (AM):ApplicationMaster (AM) 运行在 Slave 上,负
责数据切分、申请资源和分配、任务监控和容错。
● NodeManager (NM):NodeManager (NM) 运行在 Slave 上,用于单节
点资源管理、AMRM 通信以及汇报状态。
● Container:Container 负责对资源进行抽象,包括内存、CPU、磁盘,网络
等资源。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 47
Yarn 架构原理 -交互
以在 Yarn 上运行 MapReduce 任务为例来讲解下 Yarn 架构的交互原理:
● 首先,用户编写 MapReduce 代码后,通过 Client 端进行任务提交。
● ResourceManager 在接收到客户端的请求后,会分配一个 Container 用来
启动 ApplicationMaster,并通知 NodeManager 在这个 Container 下启动
ApplicationMaster。
● ApplicationMaster 启动后,向 ResourceManager 发起注册请求。接着
ApplicationMaster 向 ResourceManager 申请资源。根据获取到的资源,和相关的 NodeManager 通信,要求其启动程序。
● 一个或者多个 NodeManager 启动 MapReduce Task。
● NodeManager 不断汇报 MapReduce Task 状态和进展给 Application-
Master。
● 当所有 MapReduce Task 都完成时,ApplicationMaster 向 ResourceM-
anager 汇报任务完成,并注销自己。48 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Flink on Yarn-Per Job
Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之
后资源就会被释放。在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解
了,具体如下:
● 首先 Client 提交 Yarn App,比如 JobGraph 或者 JARs。
● 接 下 来 Yarn 的 ResourceManager 会 申 请 第 一 个 Container。 这 个
Container 通过 Application Master 启动进程,Application Master 里面运
行的是 Flink 程序,即 Flink-Yarn ResourceManager 和 JobManager。
● 最后 Flink-Yarn ResourceManager 向 Yarn ResourceManager 申请资
源。当分配到资源后,启动 TaskManager。TaskManager 启动后向 Flink-
Yarn ResourceManager 进行注册,注册成功后 JobManager 就会分配具
体的任务给 TaskManager 开始执行。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 49
Flink on Yarn-Session
在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。而 Session 模式则不一样,它的 Dispatcher 和 Resource-
Manager 是可以复用的。Session 模式下,当 Dispatcher 在收到请求之后,会
启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会
启动 JobManager(B) 和对应的 TaskManager 的运行。当 A、B 任务运行完成
后,资源并不会释放。Session 模式也称为多线程模式,其特点是资源会一直存在
不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN
ResourceManager。
Session 模式和 Per Job 模式的应用场景不一样。Per Job 模式比较适合那种
对启动时间不敏感,运行时间较长的任务。Seesion 模式适合短时间运行的任务,一
般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资
源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种
任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。
Yarn 模式特点
Yarn 模式的优点有:50 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
● 资源的统一管理和调度。Yarn 集群中所有节点的资源(内存、CPU、磁
盘、网络等)被抽象为 Container。计算框架需要资源进行运算任务时需要
向 Resource Manager 申请 Container,Yarn 按照特定的策略对资源进行
调度和进行 Container 的分配。Yarn 模式能通过多种任务调度策略来利用
提高集群资源利用率。例如 FIFO Scheduler、Capacity Scheduler、Fair
Scheduler,并能设置任务优先级。
● 资源隔离。Yarn 使用了轻量级资源隔离机制 Cgroups 进行资源隔离以避免相
互干扰,一旦 Container 使用的资源量超过事先定义的上限值,就将其杀死。
● 自动 failover 处理。例如 Yarn NodeManager 监控、Yarn Application-
Manager 异常恢复。
Yarn 模式虽然有不少优点,但是也有诸多缺点,例如运维部署成本较高,灵活
性不够。
Flink on Yarn 实践
关于 Flink on Yarn 的实践在社区官网上面有很多课程,例如:《Flink 安装部
署、环境配置及运行应用程序》 和 《客户端操作》都是基于 Yarn 进行讲解的,这里
就不再赘述。
社区官网:
https:ververica.cndevelopersflink-training-course1
Flink on Kubernetes 原理剖析
Kubernetes 是 Google 开源的容器集群管理系统,其提供应用部署、维
护、扩展机制等功能,利用 Kubernetes 能方便地管理跨机器运行容器化的应用。
Kubernetes 和 Yarn 相比,相当于下一代的资源管理系统,但是它的能力远远不止
这些。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 51
Kubernetes- 基本概念
Kubernetes(k8s)中的 Master 节点,负责管理整个集群,含有一个集群的
资源数据访问入口,还包含一个 Etcd 高可用键值存储服务。Master 中运行着 API
Server,Controller Manager 及 Scheduler 服务。
Node 为集群的一个操作单元,是 Pod 运行的宿主机。Node 节点里包含一个
agent 进程,能够维护和管理该 Node 上的所有容器的创建、启停等。Node 还含
有一个服务端 kube-proxy,用于服务发现、反向代理和负载均衡。Node 底层含有
docker engine,docker 引擎主要负责本机容器的创建和管理工作。
Pod 运行于 Node 节点上,是若干相关容器的组合。在 K8s 里面 Pod 是创建、调度和管理的最小单位。
Kubernetes- 架构图52 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Kubernetes 的架构如图所示,从这个图里面能看出 Kubernetes 的整个运行
过程。
● API Server 相当于用户的一个请求入口,用户可以提交命令给 Etcd,这时会
将这些请求存储到 Etcd 里面去。
● Etcd 是一个键值存储,负责将任务分配给具体的机器,在每个节点上的
Kubelet 会找到对应的 container 在本机上运行。
● 用户可以提交一个 Replication Controller 资源描述,Replication Controller
会监视集群中的容器并保持数量;用户也可以提交 service 描述文件,并由
kube proxy 负责具体工作的流量转发。
Kubernetes- 核心概念
Kubernetes 中比较重要的概念有:
● Replication Controller (RC) 用来管理 Pod 的副本。RC 确保任何时候
Kubernetes 集群中有指定数量的 pod 副本 (replicas) 在运行, 如果少于指定
数量的 pod 副本,RC 会启动新的 Container,反之会杀死多余的以保证数量
不变。
● Service 提供了一个统一的服务访问入口以及服务代理和发现机制
● Persistent Volume(PV) 和 Persistent Volume Claim(PVC) 用于数据的持
久化存储。
● ConfigMap 是指存储用户程序的配置文件,其后端存储是基于 Etcd。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 53
Flink on Kubernetes- 架构
Flink on Kubernetes 的架构如图所示,Flink 任务在 Kubernetes 上运行的步
骤有:
● 首先往 Kubernetes 集群提交了资源描述文件后,会启动 Master 和 Worker
的 container。
● Master Container 中会启动 Flink Master Process,包含 Flink-Container
ResourceManager、JobManager 和 Program Runner。
● Worker Container 会启动 TaskManager,并向负责资源管理的 Re-
sourceManager 进行注册,注册完成之后,由 JobManager 将具体的任务
分给 Container,再由 Container 去执行。
● 需要说明的是,在 Flink 里的 Master 和 Worker 都是一个镜像,只是脚本的54 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
命令不一样,通过参数来选择启动 master 还是启动 Worker。
Flink on Kubernetes-JobManager
JobManager 的执行过程分为两步 :
● 首先,JobManager 通过 Deployment 进行描述,保证 1 个副本的 Con-
tainer 运行 JobManager,可以定义一个标签,例如 flink-jobmanager。
● 其次,还需要定义一个 JobManager Service,通过 service name 和 port
暴露 JobManager 服务,通过标签选择对应的 pods。
Flink on Kubernetes-TaskManager
TaskManager 也是通过 Deployment 来进行描述,保证 n 个副本的 Contain-
er 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager。
对于 JobManager 和 TaskManager 运行过程中需要的一些配置文件,如:
flink-conf.yaml、hdfs-site.xml、core-site.xml,可以通过将它们定义为 Config-
Map 来实现配置的传递和读取。
Flink on Kubernetes- 交互
整个交互的流程比较简单,用户往 Kubernetes 集群提交定义好的资源描述
文件即可,例如 deployment、configmap、service 等描述。后续的事情就交给
Kubernetes 集群自动完成。Kubernetes 集群会按照定义好的描述来启动 pod,运Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 55
行用户程序。各个组件的具体工作如下:
● Service: 通过标签(label selector) 找到 job manager 的 pod 暴露服务。
● Deployment:保证 n 个副本的 container 运行 JMTM,应用升级策略。
● ConfigMap:在每个 pod 上通过挂载 etcflink 目录,包含 flink-conf.yaml
内容。
Flink on Kubernetes- 实践
接下来就讲一下 Flink on Kubernetes 的实践篇,即 K8s 上是怎么运行任务的。
● Session Cluster
Session Cluster
启动
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml
Submit job
kubectl port-forward serviceflink-jobmanager 8081:8081
binflink run -d -m localhost:8081 .examplesstreaming
TopSpeedWindowing.jar
停止
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f jobmanager-service.yaml
首先启动 Session Cluster,执行上述三条启动命令就可以将 Flink 的 Job-
Manager-service、jobmanager-deployment、taskmanager-deployment 启动
起来。启动完成之后用户可以通过接口进行访问,然后通过端口进行提交任务。若想
销毁集群,直接用 kubectl delete 即可,整个资源就可以销毁。56 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
Flink 官方提供的例子如图所示,图中左侧为 jobmanager-deployment.yaml
配置,右侧为 taskmanager-deployment.yaml 配置。
在 jobmanager-deployment.yaml 配置中,代码的第一行为 apiVersion,apiVersion 是 API 的一个版本号,版本号用的是 extensionsvlbetal 版本。资源
类型为 Deployment,元数据 metadata 的名为 flink-jobmanager,spec 中含
有副本数为 1 的 replicas,labels 标签用于 pod 的选取。containers 的镜像名
为 jobmanager,containers 包含从公共 docker 仓库下载的 image,当然也可
以使用公司内部的私有仓库。args 启动参数用于决定启动的是 jobmanager 还是
taskmanager;ports 是服务端口,常见的服务端口为 8081 端口;env 是定义的环
境变量,会传递给具体的启动脚本。
右图为 taskmanager-deployment.yaml 配置,taskmanager-deployment.
yaml 配置与 jobmanager-deployment.yaml 相似,但 taskmanager-deploy-
ment.yaml 的副本数是 2 个。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 57
接下来是 jobmanager-service.yaml 的配置,jobmanager-service.yaml 的
资源类型为 Service,在 Service 中的配置相对少一些,spec 中配置需要暴露的服
务端口的 port,在 selector 中,通过标签选取 jobmanager 的 pod。
● Job Cluster
除了 Session 模式,还有一种 Per Job 模式。在 Per Job 模式下,需要将用户
代码都打到镜像里面,这样如果业务逻辑的变动涉及到 Jar 包的修改,都需要重新生
成镜像,整个过程比较繁琐,因此在生产环境中使用的比较少。
以使用公用 docker 仓库为例,Job Cluster 的运行步骤如下:
● build 镜像:在 flinkflink-containerdocker 目录下执行 build.sh 脚本,指定从哪个版本开始去构建镜像,成功后会输出 “Successfully tagged 58 > Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践
topspeed:latest” 的提示。
sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8--scala-version 2.11 --job-jar ~
flinkflink-1.7.1examplesstreamingTopSpeedWindowing.jar --image-name
topspeed
● 上传镜像:在 hub.docker.com 上需要注册账号和创建仓库进行上传镜像。
docker tag topspeed zkb555topspeedwindowing
docker push zkb555topspeedwindowing
● 启动任务:在镜像上传之后,可以启动任务。
kubectl create -f job-cluster-service.yaml
FLINK_IMAGE_NAME=zkb555topspeedwindowing:latest FLINK_JOB=org.apache.flink.
streaming.
examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-
cluster-job.
yaml.template | kubectl create -f -
FLINK_IMAGE_NAME=zkb555topspeedwindowing:latest FLINK_JOB_PARALLELISM=4
envsubst <
task-manager-deployment.yaml.template | kubectl create -f -
Flink on YarnKubernetes 问题解答
Q: Flink 在 K8s 上可以通过 Operator 方式提交任务吗?
目前 Flink 官方还没有提供 Operator 的方式,Lyft 公司开源了自己的 Opera-
tor 实现:https:github.comlyftflinkk8soperator。
Q: 在 K8s 集群上如果不使用 Zookeeper 有没有其他高可用(HA)的方案?
Etcd 是一个类似于 Zookeeper 的高可用键值服务,目前 Flink 社区正在
考 虑 基 于 Etcd 实 现 高 可 用 的 方 案(https:issues.apache.orgjirabrowse
FLINK-11105)以及直接依赖 K8s API 的方案(https:issues.apache.orgjira
browseFLINK-12884)。Apache Flink 进阶(四):Flink on YarnK8s 原理剖析及实践 < 59
Q: Flink on K8s 在任务启动时需要指定 TaskManager 的个数,有和 Yarn
一样的动态资源申请方式吗?
Flink on K8s 目前的实现在任务启动前就需要确定好 TaskManager 的个数,这样容易造成 TM 指定太少,任务无法启动,或者指定的太多,造成资源浪费。社区
正在考虑实现和 Yarn 一样的任务启动时动态资源申请的方式。这是一种和 K8s 结
合的更为 Nativey 的方式,称为 Active 模式。Active 意味着 ResourceManager
可以直接向 K8s 集群申请资源。具体设计方案和进展请关注:
https:issues.apache.orgjirabrowseFLINK-9953。Apache Flink 进阶(五):数据类型和序列化
作者:马庆祥
奇虎 360 数据开发高级工程师
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、奇
虎 360 数据开发高级工程师马庆祥老师分享。文章主要从如何为 Flink 量身定制序列
化框架、Flink 序列化的最佳实践、Flink 通信层的序列化以及问答环节四部分分享。
为 Flink 量身定制的序列化框架
为什么要为 Flink 量身定制序列化框架?
大家都知道现在大数据生态非常火,大多数技术组件都是运行在 JVM 上的,Flink 也是运行在 JVM 上,基于 JVM 的数据分析引擎都需要将大量的数据存储在内
存中,这就不得不面临 JVM 的一些问题,比如 Java 对象存储密度较低等。针对这
些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来
进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在 Java 生态圈中已经有许多序列化框架,比如说 Java serialization, Kryo,Apache Avro 等等。但是 Flink 依然是选择了自己定制的序列化框架,那么到底有
什么意义呢?若 Flink 选择自己定制的序列化框架,对类型信息了解越多,可以在早
期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直
接操作二进制数据。 Apache Flink 进阶(五):数据类型和序列化 < 61
Flink 的数据类型
Flink 在其内部构建了一套自己的类型系统,Flink 现阶段支持的类型分类如图
所示,从图中可以看到 Flink 类型可以分为基础类型(Basic)、数组(Arrays)、复合
类型(Composite)、辅助类型(Auxiliary)、泛型和其它类型(Generic)。Flink 支
持任意的 Java 或是 Scala 类型。不需要像 Hadoop 一样去实现一个特定的接口
(org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。62 > Apache Flink 进阶(五):数据类型和序列化
那这么多的数据类型,在 Flink 内部又是如何表示的呢?图示中的 Person 类,复合类型的一个 Pojo 在 Flink 中是用 PojoTypeInfo 来表示,它继承至 TypeInfor-
mation,也即在 Flink 中用 TypeInformation 作为类型描述符来表示每一种要表示的
数据类型。
TypeInformation
TypeInformation 的思维导图如图所示,从图中可以看出,在 Flink 中每一个具
体的类型都对应了一个具体的 TypeInformation 实现类,例如 BasicTypeInforma-
tion 中的 IntegerTypeInformation 和 FractionalTypeInformation 都具体的对应了
一个 TypeInformation。然后还有 BasicArrayTypeInformation、CompositeType
以及一些其它类型,也都具体对应了一个 TypeInformation。
TypeInformation 是 Flink 类型系统的核心类。对于用户自定义的 Function
来说,Flink 需要一个类型信息来作为该函数的输入输出类型,即 TypeInfomation。
该类型信息类作为一个工具来生成对应类型的序列化器 TypeSerializer,并用于执行Apache Flink 进阶(五):数据类型和序列化 < 63
语义检查,比如当一些字段在作为 joing 或 grouping 的键时,检查这些字段是否在
该类型中存在。
如何使用 TypeInformation ?下面的实践中会为大家介绍。
Flink 的序列化过程
在 Flink 序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何
而来?
每一个具体的数据类型都对应一个 TypeInformation 的具体实现,每一个
TypeInformation 都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink
的序列化过程图可以看到 TypeInformation 会提供一个 createSerialize 方法,通
过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象 TypeSerial-
izer。64 > Apache Flink 进阶(五):数据类型和序列化
对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数
据集进行序列化和反序列化,比如,BasicTypeInfo、WritableTypeIno 等,但针对
GenericTypeInfo 类型,Flink 会使用 Kyro 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这
种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类
型的序列化器。
简单的介绍下 Pojo 的类型规则,即在满足一些条件的情况下,才会选用 Pojo
的序列化进行相应的序列化与反序列化的一个操作。即类必须是 Public 的,且类有
一个 public 的无参数构造函数,该类(以及所有超类)中的所有非静态 no-static、非瞬态 no-transient 字段都是 public 的(和非最终的 final)或者具有公共 getter 和
setter 方法,该方法遵循 getter 和 setter 的 Java bean 命名约定。当用户定义的数
据类型无法识别为 POJO 类型时,必须将其作为 GenericType 处理并使用 Kryo 进
行序列化。
Flink 自带了很多 TypeSerializer 子类,大多数情况下各种自定义类型都是常
用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足
你的需求,Flink 的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需
要实现 TypeInformation、TypeSerializer 和 TypeComparator 即可定制自己类型
的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。Apache Flink 进阶(五):数据类型和序列化 < 65
序列化就是将数据结构或者对象转换成一个二进制串的过程,在 Java 里面可以
简单地理解成一个 byte 数组。而反序列化恰恰相反,就是将序列化过程中所生成的
二进制串转换成数据结构或者对象的过程。下面就以内嵌型的 Tuple 3 这个对象为
例,简述一下它的序列化过程。
Tuple 3 包含三个层面,一是 int 类型,一是 double 类型,还有一个是
Person。Person 包含两个字段,一是 int 型的 ID,另一个是 String 类型的 name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图
中可以看到 Tuple 3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需
要占用四个字节就可以了。根据 int 占用四个字节,这个能够体现出 Flink 可序列化
过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反
序列化操作。相反,如果采用 Java 的序列化,虽然能够存储更多的属性信息,但一
次占据的存储空间会受到一定的损耗。
Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把
一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相
应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去
支持。MemorySegment 具有什么作用呢?
MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上,它代表 1
个固定长度的内存,默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最
小的内存分配单元,相当于是 Java 的一个 byte 数组。 每条记录都会以序列化的形
式存储在一个或多个 MemorySegment 中。
Flink 序列化的最佳实践
最常见的场景
Flink 常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型
提示、手动创建 TypeInformation,具体介绍如下:66 > Apache Flink 进阶(五):数据类型和序列化
● 注册子类型:如果函数签名只描述了超类型,但是它们实际上在执行期间使
用了超类型的子类型,那么让 Flink 了解这些子类型会大大提高性能。可
以 在 StreamExecutionEnvironment 或 ExecutionEnvironment 中 调 用
.registertype (clazz) 注册子类型信息。
● 注册自定义序列化:对于不适用于自己的序列化框架的数据类型,Flink 会使
用 Kryo 来进行序列化,并不是所有的类型都与 Kryo 无缝连接,具体注册方
法在下文介绍。
● 添加类型提示:有时,当 Flink 用尽各种手段都无法推测出泛型信息时,用户
需要传入一个类型提示 TypeHint,这个通常只在 Java API 中需要。
● 手动创建一个 TypeInformation:在某些 API 调用中,这可能是必需的,因
为 Java 的泛型类型擦除导致 Flink 无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为 Flink 已经提
供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景
下,需要去做一些相应的处理。
实践 -类型声明
类型声明去创建一个类型信息的对象是通过哪种方式?通常是用 TypeInforma-
tion.of 方法来创建一个类型信息的对象,具体说明如下:
● 对于非泛型类,直接传入 class 对象即可。
PojoTypeInfo
of(Person.class);
● 对于泛型类,需要通过 TypeHint 来保存泛型类型信息。
final TypeInfomation
of(new TypeHint
● 预定义常量。Apache Flink 进阶(五):数据类型和序列化 < 67
如 BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本类型的
类型声明,可以直接使用。而且 Flink 还提供了完全等价的 Types 类(org.apache.
flink.api.common.typeinfo.Types)。特别需要注意的是,flink-table 模块也有一个
Types 类(org.apache.flink.table.api.Types),用于 table 模块内部的类型定义信
息,用法稍有不同。使用 IDE 的自动 import 时一定要小心。
● 自定义 TypeInfo 和 TypeInfoFactory。
通过自定义 TypeInfo 为任意类提供 Flink 原生内存管理(而非 Kryo),可令存
储更紧凑,运行时也更高效。需要注意在自定义类上使用 @TypeInfo 注解,随后创
建相应的 TypeInfoFactory 并覆盖 createTypeInfo 方法。
实践 -注册子类型
Flink 认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。
StreamExecutionEnvironment 和 ExecutionEnvironment 提供 registerType
方法用来向 Flink 注册子类信息。
final ExecutionEnvironment env = ExecutionEnvironment.
getExecutionEnvironment;Env. registerType(typeClass);
在 registerType 方法内部,会使用 TypeExtractor 来提取类型信息,如上图
所示,获取到的类型信息属于 PojoTypeInfo 及其子类,那么需要将其注册到一起,否则统一交给 Kryo 去处理,Flink 并不过问 ( 这种情况下性能会变差 )。68 > Apache Flink 进阶(五):数据类型和序列化
实践 -Kryo 序列化
对于 Flink 无法序列化的类型(例如用户自定义类型,没有 registerType,也没
有自定义 TypeInfo 和 TypeInfoFactory),默认会交给 Kryo 处理,如果 Kryo 仍然
无法处理(例如 Guava、Thrift、Protobuf 等第三方库的一些类),有两种解决方案:
● 强制使用 Avro 来代替 Kryo。
env.getConfig.enableForceAvro;
● 为 Kryo 增加自定义的 Serializer 以增强 Kryo 的功能。
env.getConfig.addDefaultKryoSerializer(clazz, serializer);
注:如果希望完全禁用 Kryo(100% 使用 Flink 的序列化机制),可以通过 Kryo-
env.getConfig.disableGenericTypes 的方式完成,但注意一切无法处理的类都
将导致异常,这种对于调试非常有效。
Flink 通信层的序列化
Flink 的 Task 之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之
后写入 NetworkBufferPool,然后下层的 Task 读出之后再进行反序列化操作,最
后进行逻辑处理。
为了使得记录以及事件能够被写入 Buffer,随后在消费时再从 Buffer 中读出,Flink 提供了数据记录序列化器(RecordSerializer)与反序列化器(RecordDeseri-
alizer)以及事件序列化器(EventSerializer)。
Function 发送的数据被封装成 SerializationDelegate,它将任意元素公开为
IOReadableWritable 以进行序列化,通过 setInstance 来传入要序列化的数据。
在 Flink 通信层的序列化中,有几个问题值得关注,具体如下:Apache Flink 进阶(五):数据类型和序列化 < 69
● 何时确定 Function 的输入输出类型?
在构建 StreamTransformation 的时候通过 TypeExtractor 工具确定 Func-
tion 的输入输出类型。TypeExtractor 类可以根据方法签名、子类信息等蛛丝马迹自
动提取或恢复类型信息。
● 何时确定 Function 的序列化 反序列化器?
构造 StreamGraph 时,通过 TypeInfomation 的 createSerializer 方法
获取对应类型的序列化器 TypeSerializer,并在 addOperator 的过程中执行
setSerializers 操作,设置 StreamConfig 的 TYPESERIALIZERIN1 、 TYPE-
SERIALIZERIN2、 TYPESERIALIZEROUT_1 属性。
● 何时进行真正的序列化 反序列化操作?这个过程与 TypeSerializer 又是怎么
联系在一起的呢?
大家都应该清楚 Tsk 和 StreamTask 两个概念,Task 是直接受 TaskMan-
ager 管理和调度的,而 Task 又会调用 StreamTask,而 StreamTask 中真正封装
了算子的处理逻辑。在 run 方法中,首先将反序列化后的数据封装成 StreamRe-70 > Apache Flink 进阶(五):数据类型和序列化
cord 交给算子处理;然后将处理结果通过 Collector 发动给下游 ( 在构建 Collector
时已经确定了 SerializtionDelegate),并通过 RecordWriter 写入器将序列化后的结
果写入 DataOutput;最后序列化的操作交给 SerializerDelegate 处理,实际还是通
过 TypeSerializer 的 serialize 方法完成。Apache Flink 进阶(六):Flink 作业执行深度解析
作者:岳猛,网易云音乐实时计算平台研发工程师
整理:毛鹤
本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink Contribu-
tor、网易云音乐实时计算平台研发工程师岳猛分享。主要分享内容为 Flink Job 执行
作业的流程,文章将从两个方面进行分享:一是如何从 Program 到物理执行计划,二是生成物理执行计划后该如何调度和执行。
Flink 四层转化流程
Flink 有四层转换流程,第一层为 Program 到 StreamGraph;第二层为
StreamGraph 到 JobGraph;第三层为 JobGraph 到 ExecutionGraph;第四层为
ExecutionGraph 到物理执行计划。通过对 Program 的执行,能够生成一个 DAG
执行图,即逻辑执行图。如下:72 > Apache Flink 进阶(六):Flink 作业执行深度解析
第一部分将先讲解四层转化的流程,然后将以详细案例讲解四层的具体转化。
● 第一层 StreamGraph 从 Source 节点开始,每一次 transform 生成一个
StreamNode,两个 StreamNode 通过 StreamEdge 连接在一起 , 形成
StreamNode 和 StreamEdge 构成的 DAG。
● 第二层 JobGraph,依旧从 Source 节点开始,然后去遍历寻找能够嵌到一
起的 operator,如果能够嵌到一起则嵌到一起,不能嵌到一起的单独生成
jobVertex,通过 JobEdge 链接上下游 JobVertex,最终形成 JobVertex 层
面的 DAG。
● JobVertex DAG 提交到任务以后,从 Source 节点开始排序 , 根据 JobVer-
tex 生成 ExecutionJobVertex,根据 jobVertex 的 IntermediateDataSet
构建 IntermediateResult,然后 IntermediateResult 构建上下游的依赖关
系,形成 ExecutionJobVertex 层面的 DAG 即 ExecutionGraph。
● 最后通过 ExecutionGraph 层到物理执行层。
Program 到 StreamGraph 的转化
Program 转换成 StreamGraph 具体分为三步:
● 从 StreamExecutionEnvironment.execute 开始执行程序,将 transform 添
加到 StreamExecutionEnvironment 的 transformations。
● 调用 StreamGraphGenerator 的 generateInternal 方法,遍历 transfor-
mations 构建 StreamNode 及 StreamEage。
● 通过 StreamEdge 连接 StreamNode。Apache Flink 进阶(六):Flink 作业执行深度解析 < 73
通过 WindowWordCount 来看代码到 StreamGraph 的转化,在 flatMap trans-
form 设置 slot 共享组为 flatMapsg,并发设置为 4,在聚合的操作中设置 slot 共享
组为 sumsg, sum 和 counts 并发设置为 3,这样设置主要是为了演示后面如何
嵌到一起的,跟上下游节点的并发以及上游的共享组有关。
WindowWordCount 代码中可以看到,在 readTextFile 中会生成一个 transform,且 transform 的 ID 是 1;然后到 flatMap 会生成一个 transform, transform 的
ID 是 2;接着到 keyBy 生成一个 transform 的 ID 是 3;再到 sum 生成一个
transform 的 ID 是 4;最后到 counts 生成 transform 的 ID 是 5。
transform 的结构如图所示,第一个是 flatMap 的 transform,第二个是 window 74 > Apache Flink 进阶(六):Flink 作业执行深度解析
的 transform, 第 三 个 是 SinkTransform 的 transform。 除 此 之 外, 还 能 在
transform 的结构中看到每个 transform 的 input 是什么。
接下来介绍一下 StreamNode 和 StreamEdge。
● StreamNode 是用来描述 operator 的逻辑节点,其关键成员变量有
slotSharingGroup、jobVertexClass、inEdges、outEdges 以 及 trans-
formationUID;
● StreamEdge 是用来描述两个 operator 逻辑的链接边,其关键变量有
sourceVertex、targetVertex。
WindowWordCount transform 到 StreamGraph 转化如图所示,StreamEx-
ecutionEnvironment 的 transformations 存在 3 个 transform,分别是 Flat Map
(Id 2)、Window(Id 4)、Sink(Id 5)。
transform 的时候首先递归处理 transform 的 input,生成 StreamNode,然后
通过 StreamEdge 链接上下游 StreamNode。需要注意的是,有些 transform 操作
并不会生成 StreamNode 如 PartitionTransformtion,而是生成个虚拟节点。Apache Flink 进阶(六):Flink 作业执行深度解析 < 75
在转换完成后可以看到,streamNodes 有四种 transform 形式,分别为 Source、Flat Map、Window、Sink。
每个 streamNode 对象都携带并发个数、slotSharingGroup、执行类等运行
信息。76 > Apache Flink 进阶(六):Flink 作业执行深度解析
StreamGraph 到 JobGraph 的转化
StreamGraph 到 JobGraph 的转化步骤:
● 设置调度模式,Eager 所有节点立即启动。
● 广度优先遍历 StreamGraph,为每个 streamNode 生成 byte 数组类型的
hash 值。
● 从 source 节点开始递归寻找嵌到一起的 operator,不能嵌到一起的节点单独
生成 jobVertex,能够嵌到一起的开始节点生成 jobVertex,其他节点以序列
化的形式写入到 StreamConfig,然后 merge 到 CHAINEDTASKCONFIG,再通过 JobEdge 链接上下游 JobVertex。
● 将每个 JobVertex 的入边 (StreamEdge) 序列化到该 StreamConfig。
● 根据 group name 为每个 JobVertext 指定 SlotSharingGroup。
● 配置 checkpoint。Apache Flink 进阶(六):Flink 作业执行深度解析 < 77
● 将缓存文件存文件的配置添加到 configuration 中。
● 设置 ExecutionConfig。
从 source 节点递归寻找嵌到一起的 operator 中,嵌到一起需要满足一定的条
件,具体条件介绍如下:
● 下游节点只有一个输入。
● 下游节点的操作符不为 null。
● 上游节点的操作符不为 null。
● 上下游节点在一个槽位共享组内。
● 下游节点的连接策略是 ALWAYS。
● 上游节点的连接策略是 HEAD 或者 ALWAYS。
● edge 的分区函数是 ForwardPartitioner 的实例。
● 上下游节点的并行度相等。
● 可以进行节点连接操作。
JobGraph 对象结构如上图所示,taskVertices 中只存在 Window、Flat Map、Source 三个 TaskVertex,Sink operator 被嵌到 window operator 中去了。
为什么要为每个 operator 生成 hash 值?
Flink 任务失败的时候,各个 operator 是能够从 checkpoint 中恢复到失败之前
的状态的,恢复的时候是依据 JobVertexID(hash 值)进行状态恢复的。相同的任
务在恢复的时候要求 operator 的 hash 值不变,因此能够获取对应的状态。78 > Apache Flink 进阶(六):Flink 作业执行深度解析
每个 operator 是怎样生成 hash 值的?
如果用户对节点指定了一个散列值,则基于用户指定的值能够产生一个长度为
16 的字节数组。如果用户没有指定,则根据当前节点所处的位置,产生一个散列值。
考虑的因素主要有三点:
● 一是在当前 StreamNode 之前已经处理过的节点的个数,作为当前 Stream-
Node 的 id,添加到 hasher 中;
● 二 是 遍 历 当 前 StreamNode 输 出 的 每 个 StreamEdge, 并 判 断 当 前
StreamNode 与这个 StreamEdge 的目标 StreamNode 是否可以进行链
接,如果可以,则将目标 StreamNode 的 id 也放入 hasher 中,且这个目标
StreamNode 的 id 与当前 StreamNode 的 id 取相同的值;
● 三是将上述步骤后产生的字节数据,与当前 StreamNode 的所有输入
StreamNode 对应的字节数据,进行相应的位操作,最终得到的字节数据,就是当前 StreamNode 对应的长度为 16 的字节数组。
JobGraph 到 ExexcutionGraph 以及物理执行计划Apache Flink 进阶(六):Flink 作业执行深度解析 < 79
JobGraph 到 ExexcutionGraph 以及物理执行计划的流程:
● 将 JobGraph 里面的 jobVertex 从 Source 节点开始排序。
● 在 executionGraph.attachJobGraph(sortedTopology) 方 法 里 面, 根 据
JobVertex 生成 ExecutionJobVertex,在 ExecutionJobVertex 构造方法
里面,根据 jobVertex 的 IntermediateDataSet 构建 IntermediateResult,根据 jobVertex 并发构建 ExecutionVertex,ExecutionVertex 构建的时
候,构建 IntermediateResultPartition(每一个 Execution 构建 Intermedi-
ateResult 数个 IntermediateResultPartition );将创建的 ExecutionJob-
Vertex 与前置的 IntermediateResult 连接起来。
● 构建 ExecutionEdge ,连接到前面的 IntermediateResultPartition,最终从
ExecutionGraph 到物理执行计划。
Flink Job 执行流程
Flink On Yarn 模式
基于 Yarn 层面的架构类似 Spark on Yarn 模式,都是由 Client 提交 App 到
RM 上面去运行,然后 RM 分配第一个 container 去运行 AM,然后由 AM 去负责
资源的监督和管理。需要说明的是,Flink 的 Yarn 模式更加类似 Spark on Yarn 的 80 > Apache Flink 进阶(六):Flink 作业执行深度解析
cluster 模式,在 cluster 模式中,dirver 将作为 AM 中的一个线程去运行。
Flink on Yarn 模式也是会将 JobManager 启动在 container 里面,去做
个 driver 类似的任务调度和分配,Yarn AM 与 Flink JobManager 在同一个
Container 中,这样 AM 可以知道 Flink JobManager 的地址,从而 AM 可以申请
Container 去启动 Flink TaskManager。待 Flink 成功运行在 Yarn 集群上,Flink
Yarn Client 就可以提交 Flink Job 到 Flink JobManager,并进行后续的映射、调
度和计算处理。
Fink on Yarn 的缺陷
● 资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周
期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下
降时无法归还空闲的资源,在负载上升时也无法动态扩展。
● On-Yarn 模式下,所有的 container 都是固定大小的,导致无法根据作业需
求来调整 container 的结构。譬如 CPU 密集的作业或许需要更多的核,但不
需要太多内存,固定结构的 container 会导致内存被浪费。
● 与容器管理基础设施的交互比较笨拙,需要两个步骤来启动 Flink 作业: 1.启
动 Flink 守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器
部署的一部分,那么将不再需要步骤 2。
● On-Yarn 模式下,作业管理页面会在作业完成后消失不可访问。
● Flink 推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行
多个作业的 session 模式,令人疑惑。
在 Flink 版本 1.5 中引入了 Dispatcher,Dispatcher 是在新设计里引入的一个
新概念。Dispatcher 会从 Client 端接受作业提交请求并代表它在集群管理器上启动
作业。Apache Flink 进阶(六):Flink 作业执行深度解析 < 81
引入 Dispatcher 的原因主要有两点:
● 第一,一些集群管理器需要一个中心化的作业生成和监控实例;
● 第二,能够实现 Standalone 模式下 JobManager 的角色,且等待作业提交。
在一些案例中,Dispatcher 是可选的(Yarn) 或者不兼容的 (kubernetes)。
资源调度模型重构下的 Flink On Yarn 模式
● 没有 Dispatcher job 运行过程
客 户 端 提 交 JobGraph 以 及 依 赖 jar 包 到 YarnResourceManager, 接
着 Yarn ResourceManager 分配第一个 container 以此来启动 AppMaster,Application Master 中会启动一个 FlinkResourceManager 以及 JobManager,JobManager 会根据 JobGraph 生成的 ExecutionGraph 以及物理执行计划向
FlinkResourceManager 申请 slot,FlinkResoourceManager 会管理这些 slot
以及请求,如果没有可用 slot 就向 Yarn 的 ResourceManager 申请 container,container 启动以后会注册到 FlinkResourceManager,最后 JobManager 会将
subTask deploy 到对应 container 的 slot 中去。82 > Apache Flink 进阶(六):Flink 作业执行深度解析
● 在有 Dispatcher 的模式下
会增加一个过程,就是 Client 会直接通过 HTTP Server 的方式,然后用
Dispatcher 将这个任务提交到 Yarn ResourceManager 中。
新框架具有四大优势,详情如下:
● client 直接在 Yarn 上启动作业,而不需要先启动一个集群然后再提交作业到
集群。因此 client 再提交作业后可以马上返回。
● 所有的用户依赖库和配置文件都被直接放在应用的 classpath,而不是用动态
的用户代码 classloader 去加载。
● container 在需要时才请求,不再使用时会被释放。
●“需要时申请”的 container 分配方式允许不同算子使用不同 profile (CPU 和
内存结构 ) 的 container。Apache Flink 进阶(六):Flink 作业执行深度解析 < 83
新的资源调度框架下 single cluster job on Yarn 流程介绍
single cluster job on Yarn 模式涉及三个实例对象:
● clifrontend
○ Invoke App code;
○ 生成 StreamGraph,然后转化为 JobGraph;
● YarnJobClusterEntrypoint(Master)
○ 依次启动 YarnResourceManager、MinDispatcher、JobManagerRun-
ner 三者都服从分布式协同一致的策略;
○ JobManagerRunner 将 JobGraph 转 化 为 ExecutionGraph , 然 后
转化为物理执行任务 Execution,然后进行 deploy,deploy 过程会向
YarnResourceManager 请求 slot,如果有直接 deploy 到对应的 Yarn-
TaskExecutiontor 的 slot 里面,没有则向 Yarn 的 ResourceManager
申请,带 container 启动以后 deploy。
● YarnTaskExecutorRunner (slave)
○ 负责接收 subTask,并运行。
整个任务运行代码调用流程如下图:84 > Apache Flink 进阶(六):Flink 作业执行深度解析
subTask 在执行时是怎么运行的?
调用 StreamTask 的 invoke 方法,执行步骤如下:
initializeState即operator的 initializeState
openAllOperators 即operator 的open 方法
最后调用 run 方法来进行真正的任务处理
我们来看下 flatMap 对应的 OneInputStreamTask 的 run 方法具体是怎么处
理的。
@Override protected void run throws Exception {
cache processor reference on the stack, to make the code more JIT
friendly
final StreamInputProcessor
while (running inputProcessor.processInput{
all the work happens in the “processInput” method
}
}
最终是调用 StreamInputProcessor 的 processInput 做数据的处理,这里面
包含用户的处理逻辑。
public boolean processInput throws Exception {Apache Flink 进阶(六):Flink 作业执行深度解析 < 85
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
try {
numRecordsIn = ((OperatorMetricGroup) streamOperator.
getMetricGroup).getIOMetricGroup.getNumRecordsInCounter;
} catch (Exception e) {
LOG.warn( “An exception occurred during the metrics setup.” ,e);
numRecordsIn = new SimpleCounter;
}
}
while (true) {
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.
getNextRecord(deserializationDelegate);
if (result.isBufferConsumed{
currentRecordDeserializer.getCurrentBuffer.
recycleBuffer;
currentRecordDeserializer = null;
}
if (result.isFullRecord{
StreamElement recordOrMark = deserializationDelegate.
getInstance;
处理 watermark
if (recordOrMark.isWatermark{
handle watermark
watermark处理逻辑,这里可能引起 timer的trigger
statusWatermarkValve.inputWatermark(recordOrMark.
asWatermark, currentChannel);
continue;
} else if (recordOrMark.isStreamStatus{
handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.
asStreamStatus, currentChannel);
continue;
处理latency watermark
} else if (recordOrMark.isLatencyMarker{
handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.
asLatencyMarker);
}
continue;
} else {86 > Apache Flink 进阶(六):Flink 作业执行深度解析
用户的真正的代码逻辑
now we can do the actual processing
StreamRecord
synchronized (lock) {
numRecordsIn.inc;
streamOperator.setKeyContextElement1(record);
处理数据
streamOperator.processElement(record);
}
return true;
}
}
}
这里会进行 checkpoint barrier 的判断和对齐,以及不同 partition 里面
checkpoint barrier 不一致时候的,数据buffer,final BufferOrEvent bufferOrEvent = barrierHandler.
getNextNonBlocked;
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer{
currentChannel = bufferOrEvent.getChannelIndex;
currentRecordDeserializer =
recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.
getBuffer);
}
else {
Event received
final AbstractEvent event = bufferOrEvent.getEvent;
if (event.getClass != EndOfPartitionEvent.class) {
throw new IOException( “Unexpected event: “ + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty{
throw new IllegalStateException( “Trailing data in
checkpoint barrier handler.” );
}
return false;
}
}
}
streamOperator.processElement(record) 最终会调用用户的代码处理逻辑,假如 operator 是 StreamFlatMap 的话。Apache Flink 进阶(六):Flink 作业执行深度解析 < 87
@Override
public void processElement(StreamRecord
collector.setTimestamp(element);
userFunction.flatMap(element.getValue, collector);用户代码
}
如有不正确的地方,欢迎指正,关于 Flink 资源调度架构调整,网上有一篇非常
不错的针对 FLIP-6 的翻译,推荐给大家。资源调度模型重构。链接如下:
http:www.whitewood.me20180617FLIP6- 资源调度模型重构 Apache Flink 进阶(七):网络流控及反压剖析
作者:张俊,OPPO 大数据平台研发负责人
整理:张友亮
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contribu-
tor、OPPO 大数据平台研发负责人张俊老师分享,社区志愿者张友亮整理。主要
内容如下:
● 网络流控的概念与背景
● TCP 的流控机制
● Flink TCP-based 反压机制(before V1.5)
● Flink Credit-based 反压机制 (since V1.5)
● 总结与思考
网络流控的概念与背景
1. 为什么需要网络流控
首先我们可以看下这张最精简的网络流控的图,Producer 的吞吐率是 2MBs,Apache Flink 进阶(七):网络流控及反压剖析 < 89
Consumer 是 1MBs,这个时候我们就会发现在网络通信的时候我们的 Producer
的速度是比 Consumer 要快的,有 1MBs 的这样的速度差,假定我们两端都有一个
Buffer,Producer 端有一个发送用的 Send Buffer,Consumer 端有一个接收用的
Receive Buffer,在网络端的吞吐率是 2MBs,过了 5s 后我们的 Receive Buffer
可能就撑不住了,这时候会面临两种情况:
● 如果 Receive Buffer 是有界的,这时候新到达的数据就只能被丢弃掉了。
● 如果 Receive Buffer 是无界的,Receive Buffer 会持续的扩张,最终会导
致 Consumer 的内存耗尽。
2. 网络流控的实现:静态限速
为了解决这个问题,我们就需要网络流控来解决上下游速度差的问题,传统的做
法可以在 Producer 端实现一个类似 Rate Limiter 这样的静态限流,Producer 的发
送速率是 2MBs,但是经过限流这一层后,往 Send Buffer 去传数据的时候就会降
到 1MBs 了,这样的话 Producer 端的发送速率跟 Consumer 端的处理速率就可以
匹配起来了,就不会导致上述问题。但是这个解决方案有两点限制:
● 事先无法预估 Consumer 到底能承受多大的速率;
● Consumer 的承受能力通常会动态地波动。90 > Apache Flink 进阶(七):网络流控及反压剖析
3. 网络流控的实现:动态反馈 自动反压
针对静态限速的问题我们就演进到了动态反馈(自动反压)的机制,我们需要
Consumer 能够及时的给 Producer 做一个 feedback,即告知 Producer 能够承受
的速率是多少。动态反馈分为两种:
● 负反馈:接受速率小于发送速率时发生,告知 Producer 降低发送速率;
● 正反馈:发送速率小于接收速率时发生,告知 Producer 可以把发送速率提
上来。
让我们来看几个经典案例:
案例一:Storm 反压实现Apache Flink 进阶(七):网络流控及反压剖析 < 91
上图就是 Storm 里实现的反压机制,可以看到 Storm 在每一个 Bolt 都会有一
个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队
列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper
会一直被 Spout 监听,监听到有反压的情况就会停止发送,通过这样的方式匹配上
下游的发送接收速率。
案例二:Spark Streaming 反压实现
Spark Streaming 里也有做类似这样的 feedback 机制,上图 Fecher 会实时
的从 Buffer、Processing 这样的节点收集一些指标然后通过 Controller 把速度接收
的情况再反馈到 Receiver,实现速率的匹配。
疑问:为什么 Flink(before V1.5)里没有用类似的方式实现 feedback 机制?
首先在解决这个疑问之前我们需要先了解一下 Flink 的网络传输是一个什么样的
架构。92 > Apache Flink 进阶(七):网络流控及反压剖析
这张图就体现了 Flink 在做网络传输的时候基本的数据的流向,发送端在发送网
络数据前要经历自己内部的一个流程,会有一个自己的 Network Buffer,在底层用
Netty 去做通信,Netty 这一层又有属于自己的 ChannelOutbound Buffer,因为最
终是要通过 Socket 做网络请求的发送,所以在 Socket 也有自己的 Send Buffer,同样在接收端也有对应的三级 Buffer。学过计算机网络的时候我们应该了解到,TCP 是自带流量控制的。实际上 Flink (before V1.5)就是通过 TCP 的流控机制来
实现 feedback 的。
TCP 流控机制
根据下图我们来简单的回顾一下 TCP 包的格式结构。首先,他有 Sequence
number 这样一个机制给每个数据包做一个编号,还有 ACK number 这样一
个机制来确保 TCP 的数据传输是可靠的,除此之外还有一个很重要的部分就是
Window Size,接收端在回复消息的时候会通过 Window Size 告诉发送端还可以
发送多少数据。Apache Flink 进阶(七):网络流控及反压剖析 < 93
接下来我们来简单看一下这个过程。
TCP 流控:滑动窗口
TCP 的流控就是基于滑动窗口的机制,现在我们有一个 Socket 的发送端和一
个 Socket 的接收端,目前我们的发送端的速率是我们接收端的 3 倍,这样会发生什94 > Apache Flink 进阶(七):网络流控及反压剖析
么样的一个情况呢?假定初始的时候我们发送的 window 大小是 3,然后我们接收端
的 window 大小是固定的,就是接收端的 Buffer 大小为 5。
首先,发送端会一次性发 3 个 packets,将 1,2,3 发送给接收端,接收端接
收到后会将这 3 个 packets 放到 Buffer 里去。
接收端一次消费 1 个 packet,这时候 1 就已经被消费了,然后我们看到接收Apache Flink 进阶(七):网络流控及反压剖析 < 95
端的滑动窗口会往前滑动一格,这时候 2,3 还在 Buffer 当中 而 4,5,6 是空出来
的,所以接收端会给发送端发送 ACK = 4 ,代表发送端可以从 4 开始发送,同时会
将 window 设置为 3 (Buffer 的大小 5 减去已经存下的 2 和 3),发送端接收到回应
后也会将他的滑动窗口向前移动到 4,5,6。
这时候发送端将 4,5,6 发送,接收端也能成功的接收到 Buffer 中去。96 > Apache Flink 进阶(七):网络流控及反压剖析
到这一阶段后,接收端就消费到 2 了,同样他的窗口也会向前滑动一个,这时候
他的 Buffer 就只剩一个了,于是向发送端发送 ACK = 7、window = 1。发送端收到
之后滑动窗口也向前移,但是这个时候就不能移动 3 格了,虽然发送端的速度允许发
3 个 packets 但是 window 传值已经告知只能接收一个,所以他的滑动窗口就只能
往前移一格到 7 ,这样就达到了限流的效果,发送端的发送速度从 3 降到 1。
我们再看一下这种情况,这时候发送端将 7 发送后,接收端接收到,但是由于接
收端的消费出现问题,一直没有从 Buffer 中去取,这时候接收端向发送端发送 ACK Apache Flink 进阶(七):网络流控及反压剖析 < 97
= 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就
会使发送端的发送速度降为 0。这个时候发送端不发送任何数据了,接收端也不进行
任何的反馈了,那么如何知道消费端又开始消费了呢?98 > Apache Flink 进阶(七):网络流控及反压剖析
TCP 当中有一个 ZeroWindowProbe 的机制,发送端会定期的发送 1 个字节的
探测消息,这时候接收端就会把 window 的大小进行反馈。当接收端的消费恢复了之
后,接收到探测消息就可以将 window 反馈给发送端端了从而恢复整个流程。TCP
就是通过这样一个滑动窗口的机制实现 feedback。
Flink TCP-based反压机制(before V1.5)
1. 示例:WindowWordCount
大体的逻辑就是从 Socket 里去接收数据,每 5s 去进行一次 WordCount,将
这个代码提交后就进入到了编译阶段。
2. 编译阶段:生成 JobGraphApache Flink 进阶(七):网络流控及反压剖析 < 99
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 Job-
Graph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候
会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向
集群进行提交,进入运行阶段。
3. 运行阶段:调度 ExecutionGraph
JobGraph 提交到集群后会生成 ExecutionGraph ,这时候就已经具备基本的
执行任务的雏形了,把每个任务拆解成了不同的 SubTask,上图 ExecutionGraph
中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 Execu-
tionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。
然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通
过这样一个 InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPar-
tition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就
形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看100 > Apache Flink 进阶(七):网络流控及反压剖析
的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
4. 问题拆解:反压传播两个阶段
反压的传播实际上是分为两个阶段的,对应着上面的执行图,我们一共涉及 3 个
TaskManager,在每个 TaskManager 里面都有相应的 Task 在执行,还有负责接
收数据的 InputGate,发送数据的 ResultPartition,这就是一个最基本的数据传输
的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候
是如何将这个压力反向传播回去呢?这时候就分为两种情况:
● 跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition。
● TaskManager 内,反压如何从 ResultPartition 传播到 InputGate。Apache Flink 进阶(七):网络流控及反压剖析 < 101
5. 跨 TaskManager 数据传输
前面提到,发送数据需要 ResultPartition,在每个 ResultPartition 里面会
有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。对于一个
TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在
初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同
步 Network BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Net-
work BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进
来,因为 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向
Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求
转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSub-
Partition,<1,2> 这个两个数据就可以被写入了。
之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝
到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消
费掉。接下来我们来模拟上下游处理速度不匹配的场景,发送端的速率为 2,接收端
的速率为 1,看一下反压的过程是怎样的。102 > Apache Flink 进阶(七):网络流控及反压剖析
6. 跨 TaskManager 反压过程
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他
会向 Local BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的
一个 Buffer 就会被标记为 Used。
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向
Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能
向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的
Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Apache Flink 进阶(七):网络流控及反压剖析 < 103
Network BufferPool 还是有可用的 Buffer 可以向其申请。
一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local Buffer-
Pool 的最大可用 Buffer 到了上限无法向 Network BufferPool 申请,没有办法去读
取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer
中读取数据了。
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给
发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。104 > Apache Flink 进阶(七):网络流控及反压剖析
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后
就会停止向 Socket 写数据。
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是
Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控
制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向
Netty 写数据。Apache Flink 进阶(七):网络流控及反压剖析 < 105
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向
Local BufferPool 和 Network Buffer ......
您现在查看是摘要介绍页, 详见PDF附件(18526KB,204页)。





