Flink基础教程豆瓣.pdf
http://www.100md.com
2020年11月12日
![]() |
| 第1页 |
![]() |
| 第9页 |
![]() |
| 第17页 |
![]() |
| 第23页 |
![]() |
| 第47页 |
![]() |
| 第295页 |
参见附件(11499KB,1169页)。
Flink基础教程,Flink是众多大数据处理框架中一颗冉冉升起的新星。它以同一种技术支持流处理和批处理,并能同时满足高吞吐、低延迟和容错的需求。本书由Flink项目核心成员执笔,系统阐释Flink的适用场景、设计理念、功能、用途和性能优势。

编辑推荐
作为新一代的开源流处理器,Flink是众多大数据处理框架中一颗冉冉升起的新星。它以同一种技术支持流处理和批处理,并能同时满足高吞吐、低延迟和容错的需求。本书由Flink项目核心成员执笔,系统阐释Flink的适用场景、设计理念、功能、用途和性能优势。
- Flink的适用场景
- 流处理架构相较于批处理架构的优势
- Flink中的时间概念
- Flink的检查点机制
- Flink的性能优势
内容简介
近年来,流处理变得越来越流行。作为高度创新的开源流处理器,Flink拥有诸多优势,包括容错性、高吞吐、低延迟,以及同时支持流处理和批处理的能力。本书分为6章,侧重于介绍Flink的核心设计理念、功能和用途,内容涉及事件时间和处理时间、窗口和水印机制、检查点机制、性能测评,以及Flink如何实现批处理。
本书面向有兴趣学习如何分析大规模流数据的读者。
作者简介
作者介绍
埃伦·弗里德曼(Ellen Friedman)
解决方案咨询师,知名大数据相关技术布道师,在流处理架构和大数据处理框架等方面有多部著作。
科斯塔斯·宙马斯(Kostas Tzoumas)
Flink项目核心成员,data Artisans公司联合创始人兼首席执行官,在流处理和数据科学领域经验丰富。
译者介绍
王绍翾
阿里巴巴资 深技术专家,Apache Flink Committer,淘宝花名“大沙”。毕业于北京大学信息科学技术学院,后取得加州大学圣地亚哥分校计算机工程博士学位。目前就职于阿里巴巴计算平台事业部,负责Flink SQL引擎及机器学习的相关开发。加入阿里巴巴之前,在Facebook开发分布式图存储系统TAO。曾多次拜访由Flink创始团队创办的公司data Artisans,并与其首 席执行官科斯塔斯·宙马斯(本书作者之一)以及首 席技术官斯蒂芬·尤恩有着广泛的合作。
目录
前言 ix
第 1章 为何选择Flink 1
1.1 流处理欠佳的后果 2
1.1.1 零售业和市场营销 2
1.1.2 物联网 3
1.1.3 电信业 5
1.1.4 银行和金融业 5
1.2 连续事件处理的目标 6
1.3 流处理技术的演变 6
1.4 初探Flink 9
1.5 生产环境中的Flink 12
1.5.1 布衣格电信 13
1.5.2 其他案例 14
1.6 Flink的适用场景 15
第 2章 流处理架构 17
2.1 传统架构与流处理架构 17
2.2 消息传输层和流处理层 18
2.3 消息传输层的理想功能 19
2.3.1 兼具高性能和持久性 20
2.3.2 将生产者和消费者解耦 20
2.4 支持微服务架构的流数据 21
2.4.1 数据流作为中心数据源 22
2.4.2 欺诈检测:流处理架构用例 22
2.4.3 给开发人员带来的灵活性 24
2.5 不限于实时应用程序 24
2.6 流的跨地域复制 26
第3章 Flink 的用途 29
3.1 不同类型的正确性 29
3.1.1 符合产生数据的自然规律 29
3.1.2 事件时间 31
3.1.3 发生故障后仍保持准确 32
3.1.4 及时给出所需结果 33
3.1.5 使开发和运维更轻松 33
3.2 分阶段采用Flink 34
第4章 对时间的处理 35
4.1 采用批处理架构和Lambda 架构计数 35
4.2 采用流处理架构计数 38
4.3 时间概念 40
4.4 窗口 41
4.4.1 时间窗口 41
4.4.2 计数窗口 43
4.4.3 会话窗口 43
4.4.4 触发器 44
4.4.5 窗口的实现 44
4.5 时空穿梭 44
4.6 水印 45
4.7 真实案例:爱立信公司的Kappa 架构 47
第5章 有状态的计算 49
5.1 一致性 50
5.2 检查点:保证exactly-once 51
5.3 保存点:状态版本控制 59
5.4 端到端的一致性和作为数据库的流处理器 62
5.5 Flink 的性能 65
5.5.1 Yahoo! Streaming Benchmark 65
5.5.2 变化1:使用Flink 状态 66
5.5.3 变化2:改进数据生成器并增加吞吐量 67
5.5.4 变化3:消除网络瓶颈 68
5.5.5 变化4:使用MapR Streams 69
5.5.6 变化5:增加key 基数 69
5.6 结论 71
第6章 批处理:一种特殊的流处理 73
6.1 批处理技术 75
6.2 案例研究:Flink 作为批处理器 76
附录 其他资源 79
关于作者 84
Flink基础教程豆瓣截图



1.1
1.2
1.2.1
1.2.2
1.3
1.3.1
1.3.1.1
1.3.2
1.3.2.1
1.3.2.2
1.4
1.4.1
1.5
1.5.1
1.5.1.1
1.5.1.2
1.5.1.3
1.5.2
1.5.2.1
1.5.2.2
1.5.3
1.5.3.1
1.5.3.1.1
1.5.3.1.2
1.5.3.2
1.5.3.2.1
1.5.3.2.2
1.5.3.2.3
1.5.3.2.4
1.5.3.2.5
1.5.3.2.6
1.5.3.3
1.5.3.3.1
1.5.3.3.2
1.5.3.3.3
目錄
ApacheFlink文档
概念
数据流编程模型
分布式运行时环境
教程
API教程
DataStreamAPI教程
Setup教程
本地安装教程
在Windows上运行Flink
例子
批处理示例
应用开发
项目构建设置
Java项目模板
Scala的项目模板
配置依赖关系,连接器,库
基础API概念
ScalaAPI扩展
JavaLambda表达式
FlinkDataStreamAPI编程指南
活动时间
生成时间戳水印
预定义的时间戳提取器水印发射器
状态与容错
状态运行
广播状态模式
检查点
可查询状态Beta
状态后台
管理状态的自定义序列化
算子
视窗
Join
过程函数(低级算子操作)
11.5.3.3.4
1.5.3.4
1.5.3.4.1
1.5.3.4.2
1.5.3.4.3
1.5.3.4.4
1.5.3.4.5
1.5.3.4.6
1.5.3.4.7
1.5.3.4.8
1.5.3.4.9
1.5.3.4.10
1.5.3.5
1.5.3.6
1.5.3.7
1.5.3.8
1.5.4
1.5.4.1
1.5.4.2
1.5.4.3
1.5.4.4
1.5.4.5
1.5.4.6
1.5.4.7
1.5.4.8
1.5.4.9
1.5.5
1.5.5.1
1.5.5.2
1.5.5.3
1.5.5.4
1.5.5.5
1.5.5.6
1.5.5.7
1.5.5.8
1.5.5.9
1.5.6
1.5.6.1
1.5.7
外部数据访问的异步IO.
流连接器
数据源和接收器的容错保证
ApacheKafka连接器
ApacheCassandra连接器
亚马逊AWSKinesisStreams连接器
Elasticsearch连接器
HDFS连接器
流文件接收器
RabbitMQ连接器
ApacheNiFi连接器
Twitter连接器
旁路输出
Python编程指南(流)Beta
测试
实验特点
FlinkDataSetAPI编程指南
数据集转换
容错
在数据集中压缩数据元
迭代
Python编程指南Beta
连接器
Hadoop兼容性测试版
本地执行
群集执行
TableAPI和SQL
概念和通用API
流处理概念
连接到外部系统
TableAPI
SQL
内置函数
用户定义的源和接收器
用户定义的函数
SQL客户端测试版
数据类型和序列化
为Flink程序注册自定义序列化程序
管理执行
21.5.7.1
1.5.7.2
1.5.7.3
1.5.7.4
1.5.7.5
1.5.8
1.5.8.1
1.5.8.2
1.5.8.3
1.5.8.4
1.5.8.4.1
1.5.8.4.2
1.5.8.4.3
1.5.8.4.4
1.5.8.4.5
1.5.8.5
1.5.8.5.1
1.5.8.5.2
1.5.8.5.3
1.5.8.5.4
1.5.8.5.5
1.5.8.5.6
1.5.8.5.7
1.5.8.5.8
1.5.8.5.9
1.5.8.5.10
1.5.8.5.11
1.5.8.5.12
1.5.8.5.13
1.5.9
1.5.10
1.6
1.6.1
1.6.1.1
1.6.1.2
1.6.1.3
1.6.1.4
1.6.1.5
1.6.1.6
执行配置
程序打包和分布式执行
并行执行
执行计划
重启策略
类库
FlinkCEP-Flink的复杂事件处理
风暴兼容性Beta
Gelly:FlinkGraphAPI
图API
迭代图处理
类库方法
图算法
图形生成器
二分图
FlinkML-Flink的机器学习
快速入门指南
如何贡献
交叉验证
DistanceMetrics
k-NearestNeighbors关联
MinMaxScaler
MultipleLinearRegression
在管道的引擎盖下看
PolynomialFeatures
随机异常值选择
StandardScaler
AlternatingLeastSquares
SVMusingCoCoA
最佳实践
API迁移指南
部署和运营
集群和部署
独立群集
YARN设置
Mesos设置
Kubernetes设置
Docker设置
亚马逊网络服务(AWS)
31.6.1.7
1.6.1.8
1.6.1.9
1.6.2
1.6.3
1.6.3.1
1.6.3.2
1.6.3.3
1.6.3.4
1.6.4
1.6.5
1.6.6
1.6.7
1.6.8
1.6.9
1.6.10
1.6.11
1.7
1.7.1
1.7.2
1.7.3
1.7.4
1.7.5
1.7.6
1.7.7
1.7.8
1.7.9
1.8
1.8.1
1.8.2
1.9
1.9.1
1.9.2
1.9.3
1.9.4
1.9.5
GoogleComputeEngine设置
MapR设置
Hadoop集成
JobManager高可用性(HA)
状态和容错
检查点
保存点
状态后台
调整检查点和大状态
配置
生产准备清单
命令行界面
ScalaREPL
Kerberos身份验证设置和配置
SSL设置
文件系统
升级应用程序和Flink版本
调试和监控
度量
如何使用日志记录
历史服务器
监控检查点
监测背压
监控RESTAPI
调试Windows和事件时间
调试类加载
应用程序分析
FlinkDevelopment
将Flink导入IDE
从Source建立Flink
内幕
组件堆栈
数据流容错
工作和调度
任务生命周期
文件系统
45ApacheFlink文档
译者:flink.sojb.cn
在线阅读
PDF格式
EPUB格式
MOBI格式
代码仓库
本文档适用于ApacheFlink1.7-SNAPSHOT版。这些页面的建立时间为:
090818,中部标准时间07:53:00。
ApacheFlink是一个用于分布式流和批处理数据处理的开源平台。Flink的核心是流
数据流引擎,为数据流上的分布式计算提供数据分发,通信和容错。Flink在流引擎
之上构建批处理,覆盖本机迭代支持,托管内存和程序优化。
第一步
概念:从Flink的数据流编程模型和分布式运行时环境的基本概念开始。这将有
助于您了解文档的其他部分,包括设置和编程指南。我们建议您先阅读这些部
分。
教程:
实现并运行DataStream应用程序
设置本地Flink群集
编程指南:您可以阅读我们关于基本API概念和DataStreamAPI或DataSetAPI
的指南,以了解如何编写您的第一个Flink程序。
部署
在将Flink工作投入生产之前,请阅读生产准备清单。
发行说明
发行说明涵盖了Flink版本之间的重要更改。如果您计划将Flink设置升级到更高版
本,请仔细阅读这些说明。
Flink1.6发行说明。
Flink1.5发行说明。
外部资源
ApacheFlink文档
6FlinkForward:FlinkForward网站和YouTube上提供了以往会议的讲座。使
用ApacheFlink进行强大的流处理是一个很好的起点。
培训:数据工匠的培训材料包括幻灯片,练习和示例解决方案。
博客:ApacheFlink和数据工匠博客发布了有关Flink的频繁,深入的技术文
章。
ApacheFlink文档
7概念
概念
8数据流编程模型
译者:flink.sojb.cn
抽象层次
Flink提供不同级别的抽象来开发流批处理应用程序。
最低级抽象只提供有状态流。它通过ProcessFunction嵌入到DataStreamAPI
中。它允许用户自由处理来自一个或多个流的事件,并使用一致的容错状态。
此外,用户可以注册事件时间和处理时间回调,允许程序实现复杂的计算。
实际上,大多数应用程序不需要上述低级抽象,而是针对CoreAPI编程,如
DataStreamAPI(有界无界流)和DataSetAPI(有界数据集)。这些流畅的
API提供了用于数据处理的通用构建块,例如各种形式的用户指定的转换,连
接,聚合,窗口,状态等。在这些API中处理的数据类型在相应的编程语言中
表示为类。
低级ProcessFunction与DataStreamAPI集成,因此只能对某些算子操作进行
低级抽象。该数据集API提供的有限数据集的其他原语,如循环迭代。
该TableAPI是为中心的声明性DSL表,其可被动态地改变的表(表示流
时)。该TableAPI遵循(扩展)关系模型:表有一个模式连接(类似于在关
系数据库中的表)和API提供可比的算子操作,如选择,项目,连接,分组依
据,聚合等TableAPI程序以声明方式定义应该执行的逻辑算子操作,而不是
准确指定算子操作代码的外观。虽然TableAPI可以通过各种类型的用户定义
函数进行扩展,但它的表现力不如CoreAPI,但使用更简洁(编写的代码更
少)。此外,TableAPI程序还会通过优化程序,在执行之前应用优化规则。
可以在表和DataStreamDataSet之间无缝转换,允许程序混合TableAPI以及
DataStream和DataSetAPI。
数据流编程模型
9Flink提供的最高级抽象是SQL。这种抽象在语义和表达方面类似于Table
API,但是将程序表示为SQL查询表达式。在SQL抽象与TableAPI紧密地相互
作用,和SQL查询可以通过定义表来执行TableAPI。
程序和数据流
Flink程序的基本构建块是流和转换。(请注意,Flink的DataSetAPI中使用的
DataSet也是内部流-稍后会详细介绍。)从概念上讲,流是(可能永无止境的)
数据记录流,而转换是将一个或多个流作为一个或多个流的算子操作。输入,并产
生一个或多个输出流。
执行时,Flink程序映射到流数据流,由流和转换算子组成。每个数据流都以一个或
多个源开头,并以一个或多个接收器结束。数据流类似于任意有向无环图
(DAG)。尽管通过迭代结构允许特殊形式的循环,但为了简单起见,我们将在
大多数情况下对此进行掩饰。
通常,程序中的转换与数据流中的算子之间存在一对一的对应关系。但是,有时一
个转换可能包含多个转换算子。
源流和接收器记录在流连接器和批处理连接器文档中。DataStream算子和DataSet
转换中记录了转换。
数据流编程模型
10并行数据流
Flink中的程序本质上是并行和分布式的。在执行期间,流具有一个或多个流分区,并且每个算子具有一个或多个算子子任务。算子子任务彼此独立,并且可以在不
同的线程中执行,并且可能在不同的机器或容器上执行。
算子子任务的数量是该特定算子的并行度。流的并行性始终是其生成算子的并行
性。同一程序的不同算子可能具有不同的并行级别。
流可以以一对一(或转发)模式或以重新分发模式在两个算子之间传输数据:
一对一流(例如,在上图中的Source和map算子之间)保存数据元的分区
和排序。这意味着map算子的subtask[1]将以与Source算子的subtask
[1]生成的顺序相同的顺序看到相同的数据元。
重新分配流(在上面的map和keyBywindow之间,以及keyBywindow
和Sink之间)重新分配流。每个算子子任务将数据发送到不同的目标子任务,具体取决于所选的转换。实例是keyBy(其通过散列Keys重新分区),广播,或Rebalance(其重新分区随机地)。在重新分配交换中,数据元之间的排序仅保存在每对发送和接收子任务中(例如,map的子任
务[1]和子任务[2]keyBywindow)。因此,在此示例中,保存了每个Keys内
的排序,但并行性确实引入了关于不同Keys的聚合结果到达接收器的顺序的非
确定性。
有关配置和控制并行性的详细信息,请参阅并行执行的文档。
数据流编程模型
11窗口
聚合事件(例如,计数,总和)在流上的工作方式与批处理方式不同。例如,不可
能计算流中的所有数据元,因为流通常是无限的(无界)。相反,流上的聚合(计
数,总和等)由窗口限定,例如“在最后5分钟内计数”或“最后100个数据元的总
和”。
Windows可以是时间驱动的(例如:每30秒)或数据驱动(例如:每100个数据
元)。一个典型地区分不同类型的窗口,例如翻滚窗口(没有重叠),滑动窗口
(具有重叠)和会话窗口(由不活动的间隙打断)。
更多窗口示例可以在此博客文章中找到。更多详细信息在窗口文档中。
时间
当在流程序中引用时间(例如定义窗口)时,可以参考不同的时间概念:
事件时间是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感
器或生产服务附加。Flink通过时间戳分配器访问事件时间戳。
摄取时间是事件在源算子处输入Flink数据流的时间。
处理时间是执行基于时间的算子操作的每个算子的本地时间。
有关如何处理时间的更多详细信息,请参阅事件时间文档。
有状态的算子操作
数据流编程模型
12虽然数据流中的许多算子操作只是一次查看一个单独的事件(例如事件解析器),但某些算子操作会记住多个事件(例如窗口算子)的信息。这些算子操作称为有
状态。
状态算子操作的状态保持在可以被认为是嵌入式键值存储的状态中。状态被分区
并严格地与有状态算子读取的流一起分发。因此,只有在keyBy函数之后才能
在被Key化的数据流上访问键值状态,并且限制为与当前事件的键相关联的值。对
齐流和状态的Keys可确保所有状态更新都是本地算子操作,从而保证一致性而无
需事务开销。此对齐还允许Flink重新分配状态并透明地调整流分区。
有关更多信息,请参阅有关状态的文档。
容错检查点
Flink使用流重放和检查点的组合实现容错。检查点与每个输入流中的特定点以及每
个算子的对应状态相关。通过恢复算子的状态并从检查点重放事件,可以从检查点
恢复流数据流,同时保持一致性(恰好一次处理语义)。
检查点间隔是在执行期间用恢复时间(需要重放的事件的数量)来折衷容错开销的
手段。
容错内部的描述提供了有关Flink如何管理检查点和相关主题的更多信息。有关启用
和配置检查点的详细信息,请参阅检查点API文档。
流处理批处理
Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限数量的数据
元)。一个数据集在内部视为数据流。因此,上述概念以相同的方式应用于批处理
程序,并且它们适用于流程序,除了少数例外:
数据流编程模型
13批处理程序的容错不使用检查点。通过完全重放流来恢复。这是可能的,因为
输入有限。这会使成本更多地用于恢复,但使常规处理更便宜,因为它避免了
检查点。
DataSetAPI中的有状态算子操作使用简化的内存核外数据结构,而不是键值
索引。
DataSetAPI引入了特殊的同步(超级步骤)迭代,这些迭代只能在有界流上
进行。有关详细信息,请查看迭代文档。
下一步
继续使用Flink的DistributedRuntime中的基本概念。
数据流编程模型
14分布式运行时环境
译者:flink.sojb.cn
任务和算子链
对于分布式执行,Flink链算子子任务一起放入任务。每个任务由一个线程执行。将
算子链接到任务中是一项有用的优化:它可以Reduce线程到线程切换和缓冲的开
销,并在降低延迟的同时提高整体吞吐量。可以配置链接行为;有关详细信息,请
参阅链接文档。
下图中的示例数据流由五个子任务执行,因此具有五个并行线程。
TaskManager,JobManager,客户端
Flink运行时包含两种类型的进程:
该JobManagers(也称为Masters)协调分布式执行。他们安排任务,协调检
查点,协调故障恢复等。
总是至少有一个JobManager。高可用性设置将具有多个JobManagers,其中
一个始终是Leader,其他人处于待机状态。
该TaskManagers(也叫工人)执行任务(或者更具体地说,子任务)的数据
流,以及缓冲器和交换数据流。
分布式运行时环境
15必须始终至少有一个TaskManager。
JobManagers和TaskManagers可以通过多种方式启动:作为独立集群直接在计算
机上,在容器中,或由YARN或Mesos等资源框架管理。TaskManagers连接到
JobManagers,宣布自己可用,并被分配工作。
该客户端是不运行时和程序执行的一部分,而是被用来准备和发送的数据流的
JobManager。之后,客户端可以断开连接或保持连接以接收进度报告。客户端既
可以作为触发执行的JavaScala程序的一部分运行,也可以在命令行进程中运
行 .binflinkrun...。
任务槽和资源
每个worker(TaskManager)都是一个JVM进程,可以在不同的线程中执行一个或
多个子任务。为了控制工人接受的任务数量,工人有所谓的任务槽(至少一个)。
每个任务槽代表TaskManager的固定资源子集。例如,具有三个插槽的
TaskManager将其13的托管内存专用于每个插槽。切换资源意味着子任务不会与来
自其他作业的子任务竞争托管内存,而是具有一定数量的保存托管内存。请注意,此处不会发生CPU隔离;当前插槽只分离任务的托管内存。
通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager有
一个插槽意味着每个任务组在一个单独的JVM中运行(例如,可以在一个单独的容
器中启动)。拥有多个插槽意味着更多子任务共享同一个JVM。同一JVM中的任务
分布式运行时环境
16共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而Reduce每任务开销。
默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们
来自同一个作业。结果是一个槽可以保存作业的整个管道。允许此插槽共享有两个
主要好处:
Flink集群需要与作业中使用的最高并行度一样多的任务槽。无需计算程序总共
包含多少任务(具有不同的并行性)。
更容易获得更好的资源利用率。如果没有插槽共享,非密集源map子任
务将阻止与资源密集型窗口子任务一样多的资源。通过插槽共享,将示例中的
基本并行性从2增加到6可以充分利用时隙资源,同时确保繁重的子任务在
TaskManagers之间公平分配。
API还包括可用于防止不期望的时隙共享的资源组机制。
根据经验,一个很好的默认任务槽数就是CPU核心数。使用超线程,每个插槽然后
需要2个或更多硬件线程上下文。
状态后台
分布式运行时环境
17存储键值索引的确切数据结构取决于所选的状态后台。一个状态后台将数据存储在
内存中的哈希映射中,另一个状态后台使用RocksDB作为键值存储。除了定义保
存状态的数据结构之外,状态后台还实现逻辑以获取键值状态的时间点
SNAPSHOT,并将该SNAPSHOT存储为检查点的一部分。
保存点
用DataStreamAPI编写的程序可以从保存点恢复执行。保存点允许更新程序和
Flink群集,而不会丢失任何状态。
保存点是手动触发的检查点,它会获取程序的SNAPSHOT并将其写入状态后台。
他们依靠常规的检查点机制。在执行期间,程序会定期在工作节点上创建
SNAPSHOT并生成检查点。对于恢复,仅需要最后完成的检查点,并且一旦新的
检查点完成,就可以安全地丢弃旧的检查点。
保存点与这些定期检查点类似,不同之处在于它们由用户触发,并且在较新的检查
点完成时不会自动过期。可以从命令行或通过RESTAPI取消作业时创建保存点。
分布式运行时环境
18教程
教程
19API教程
API教程
20DataStreamAPI教程
译者:flink.sojb.cn
在本指南中,我们将从头开始,从设置Flink项目到在Flink集群上运行流分析程序。
Wikipedia提供了一个IRC频道,其中记录了对Wiki的所有编辑。我们将在Flink中读
取此通道,并计算每个用户在给定时间窗口内编辑的字节数。这很容易使用Flink在
几分钟内实现,但它将为您提供一个良好的基础,从而开始自己构建更复杂的分析
程序。
设置Maven项目
我们将使用FlinkMavenArchetype来创建我们的项目结构。有关此内容的更多详细
信息,请参阅JavaAPI快速入门。出于我们的目的,运行命令是这样的:
mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeCatalog=https:repository.apache.orgcontentrep
ositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT -DgroupId=wiki-edits -DartifactId=wiki-edits -Dversion=0.1 -Dpackage=wikiedits -DinteractiveMode=false
注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-
DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加
存储库条目。有关此更改的详细信息,请参阅Maven官方文档
您可以编辑 groupId, artifactId而 package如果你喜欢。使用上面的参
数,Maven将创建一个如下所示的项目结构:
DataStreamAPI教程
21treewiki-edits
wiki-edits
├──pom.xml
└──src
└──main
├──java
│└──wikiedits
│├──BatchJob.java
│├──SocketTextStreamWordCount.java
│├──StreamingJob.java
│└──WordCount.java
└──resources
└──log4j.properties
我们的 pom.xml文件已经在根目录中添加了Flink依赖项,并且有几个示例Flink程
序 srcmainjava。我们可以删除示例程序,因为我们将从头开始:
rmwiki-editssrcmainjavawikiedits.java
作为最后一步,我们需要将FlinkWikipedia连接器添加为依赖关系,以便我们可以
在我们的程序中使用它。编辑它的 dependencies部分 pom.xml,使它看起来像
这样:
org.apache.flink
flink-java
{flink.version}
org.apache.flink
flink-streaming-java_2.11
{flink.version}
org.apache.flink
flink-clients_2.11
{flink.version}
org.apache.flink
flink-connector-wikiedits_2.11
{flink.version}
DataStreamAPI教程
22注意 flink-connector-wikiedits_2.11添加的依赖项。(此示例和Wikipedia连
接器的灵感来自ApacheSamza的HelloSamza示例。)
编写Flink程序
这是编码时间。启动您喜欢的IDE并导入Maven项目或打开文本编辑器并创建文
件 srcmainjavawikieditsWikipediaAnalysis.java:
packagewikiedits;
publicclassWikipediaAnalysis{
publicstaticvoidmain(String[]args)throwsException{
}
}
该计划现在非常基础,但我们会尽力填写。请注意,我不会在此处提供import语
句,因为IDE可以自动添加它们。在本节结束时,如果您只想跳过并在编辑器中输
入,我将使用import语句显示完整的代码。
Flink程序的第一步是创建一个 StreamExecutionEnvironment(或
者 ExecutionEnvironment如果您正在编写批处理作业)。这可用于设置执行参
数并创建从外部系统读取的源。所以让我们继续把它添加到main方法:
StreamExecutionEnvironmentsee=StreamExecutionEnvironment.getE
xecutionEnvironment;
接下来,我们将创建一个从WikipediaIRC日志中读取的源:
DataStreamedits=see.addSource(newWikiped
iaEditsSource);
这创建了一个我们可以进一步处理 DataStream的 WikipediaEditEvent数据
元。出于本示例的目的,我们感兴趣的是确定每个用户在特定时间窗口中添加或删
除的字节数,比如说五秒。为此,我们首先必须指定我们要在用户名上键入流,也
就是说此流上的算子操作应考虑用户名。在我们的例子中,窗口中编辑的字节的总
和应该是每个唯一的用户。对于键入流,我们必须提供一个 KeySelector,如下
所示:
DataStreamAPI教程
23KeyedStreamkeyedEdits=edits
.keyBy(newKeySelector{
@Override
publicStringgetKey(WikipediaEditEventevent){
returnevent.getUser;
}
});
这为我们提供了一个 WikipediaEditEvent具有 StringKeys的用户名。我们现
在可以指定我们希望在此流上加上窗口,并根据这些窗口中的数据元计算结果。窗
口指定要在其上执行计算的Stream片。在无限的数据元流上计算聚合时需要
Windows。在我们的例子中,我们将说我们想要每五秒聚合一次编辑的字节总和:
DataStream>result=keyedEdits
.timeWindow(Time.seconds(5))
.fold(newTuple2<>(,0L),newFoldFunction
ent,Tuple2>{
@Override
publicTuple2fold(Tuple2ac
c,WikipediaEditEventevent){
acc.f0=event.getUser;
acc.f1+=event.getByteDiff;
returnacc;
}
});
第一个调用, .timeWindow指定我们想要有五秒钟的翻滚(非重叠)窗口。第
二个调用为每个唯一键指定每个窗口切片的折叠变换。在我们的例子中,我们从一
个初始值开始, (,0L)并在其中为用户添加该时间窗口中每个编辑的字节差
异。生成的Stream现在包含 Tuple2每五秒钟发出一次的
用户。
剩下要做的就是将流打印到控制台并开始执行:
result.print;
see.execute;
最后一次调用是启动实际Flink作业所必需的。所有算子操作(例如创建源,转换和
接收器)仅构建内部算子操作的图形。只有在 execute被调用时才会在集群
上抛出或在本地计算机上执行此算子操作图。
到目前为止完整的代码是这样的:
packagewikiedits;
DataStreamAPI教程
24importorg.apache.flink.api.common.functions.FoldFunction;
importorg.apache.flink.api.java.functions.KeySelector;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.datastream.KeyedStream;
importorg.apache.flink.streaming.api.environment.StreamExecutio
nEnvironment;
importorg.apache.flink.streaming.api.windowing.time.Time;
importorg.apache.flink.streaming.connectors.wikiedits.Wikipedia
EditEvent;
importorg.apache.flink.streaming.connectors.wikiedits.Wikipedia
EditsSource;
publicclassWikipediaAnalysis{
publicstaticvoidmain(String[]args)throwsException{
StreamExecutionEnvironmentsee=StreamExecutionEnvironment.
getExecutionEnvironment;
DataStreamedits=see.addSource(newWik
ipediaEditsSource);
KeyedStreamkeyedEdits=edits
.keyBy(newKeySelector{
@Override
publicStringgetKey(WikipediaEditEventevent){
returnevent.getUser;
}
});
DataStream>result=keyedEdits
.timeWindow(Time.seconds(5))
.fold(newTuple2<>(,0L),newFoldFunction
Event,Tuple2>{
@Override
publicTuple2fold(Tuple2ac
c,WikipediaEditEventevent){
acc.f0=event.getUser;
acc.f1+=event.getByteDiff;
returnacc;
}
});
result.print;
see.execute;
}
}
您可以使用Maven在IDE或命令行上运行此示例:
DataStreamAPI教程
25mvncleanpackage
mvnexec:java-Dexec.mainClass=wikiedits.WikipediaAnalysis
第一个命令构建我们的项目,第二个命令执行我们的主类。输出应该类似于:
1>(Fenixdown,114)
6>(AnomieBOT,155)
8>(BD2412bot,-3690)
7>(IgnorantArmies,49)
3>(Ckh3111,69)
5>(Slade360,0)
7>(Narutolovehinata5,2195)
6>(Vuyisa2001,79)
4>(MsSarahWelch,269)
4>(KasparBot,-245)
每行前面的数字告诉您输出生成的打印接收器的哪个并行实例。
这应该让您开始编写自己的Flink程序。要了解更多信息,您可以查看我们的基本概
念指南和DataStreamAPI。如果您想了解如何在自己的机器上设置Flink群集并将
结果写入Kafka,请坚持参加奖励练习。
奖金练习:在群集上运行并写入Kafka
请按照我们的本地安装教程在您的机器上设置Flink分发,并在继续算子操作之前
参考Kafka快速入门以设置Kafka安装。
作为第一步,我们必须添加FlinkKafka连接器作为依赖关系,以便我们可以使用
Kafka接收器。将其添加到 pom.xml依赖项部分中的文件:
org.apache.flink
flink-connector-kafka-0.8_2.11
{flink.version}
接下来,我们需要修改我们的程序。我们将移除 print水槽,而是使用Kafka水
槽。新代码如下所示:
DataStreamAPI教程
26result
.map(newMapFunction,String>{
@Override
publicStringmap(Tuple2tuple){
returntuple.toString;
}
})
.addSink(newFlinkKafkaProducer08<>(localhost:9092,wiki-
result,newSimpleStringSchema));
还需要导入相关的类:
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaPro
ducer08;
importorg.apache.flink.api.common.serialization.SimpleStringSch
ema;
importorg.apache.flink.api.common.functions.MapFunction;
注意我们是如何第一个转换的流 Tuple2来流 String使
用MapFunction。我们这样做是因为将简单字符串写入Kafka更容易。然后,我们创
建了一个Kafka水槽。您可能必须使主机名和端口适应您的设
置。 wiki-result是运行我们的程序之前我们将要创建的Kafka流的名称。使
用Maven构建项目因为我们需要jar文件在集群上运行:
mvncleanpackage
生成的jar文件将位于 target子文件夹中: targetwiki-edits-0.1.jar。我
们稍后会用到它。
现在我们准备启动Flink集群并运行写入Kafka的程序。转到安装Flink的位置并启动
本地群集:
cdmyflinkdirectory
binstart-cluster.sh
我们还必须创建Kafka主题,以便我们的程序可以写入它:
cdmykafkadirectory
binkafka-topics.sh--create--zookeeperlocalhost:2181--topi
cwiki-results
现在我们准备在本地Flink集群上运行我们的jar文件:
DataStreamAPI教程
27cdmyflinkdirectory
binflinkrun-cwikiedits.WikipediaAnalysispathtowikiedits
-0.1.jar
如果一切按计划进行,那么该命令的输出应该与此类似:
0308201615:09:27JobexecutionswitchedtostatusRUNNING.
0308201615:09:27Source:CustomSource(11)switchedtoSCHED
ULED
0308201615:09:27Source:CustomSource(11)switchedtoDEPLO
YING
0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(
5000),FoldingStateDescriptor{name=window-contents,defaultValue
=(,0),serializer=null},ProcessingTimeTrigger,WindowedStream
.fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi
tchedtoSCHEDULED
0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(
5000),FoldingStateDescriptor{name=window-contents,defaultValue
=(,0),serializer=null},ProcessingTimeTrigger,WindowedStream
.fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi
tchedtoDEPLOYING
0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(
5000),FoldingStateDescriptor{name=window-contents,defaultValue
=(,0),serializer=null},ProcessingTimeTrigger,WindowedStream
.fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi
tchedtoRUNNING
0308201615:09:27Source:CustomSource(11)switchedtoRUNNI
NG
您可以看到各个算子如何开始运行。只有两个,因为出于性能原因,窗口之后的算
子操作被折叠成一个算子操作。在Flink,我们称之为链接。
您可以通过使用Kafka控制台使用者检查Kafka主题来观察程序的输出:
binkafka-console-consumer.sh--zookeeperlocalhost:2181--topi
cwiki-result
您还可以查看应在http:localhost:8081上运行的Flink仪表板。您将获得群集资
源和正在运行的作业的概述:
如果单击正在运行的作业,您将获得一个视图,您可以在其中检查各个算子操作,例如,查看已处理数据元的数量:
DataStreamAPI教程
28这就结束了我们对Flink的小游览。如果您有任何疑问,请随时询问我们的邮件列
表。
DataStreamAPI教程
29Setup教程
Setup教程
30本地安装教程
译者:flink.sojb.cn
只需几个简单的步骤即可启动并运行Flink示例程序。
设置:下载并启动Flink
Flink可在Linux,MacOSX和Windows上运行。为了能够运行Flink,唯一的要求
是安装一个有效的Java8.x.Windows用户,请查看Windows上的Flink指南,该指
南介绍了如何在Windows上运行Flink以进行本地设置。
您可以通过发出以下命令来检查Java正确安装:
java-version
如果你有Java8,输出将如下所示:
javaversion1.8.0_111
Java(TM)SERuntimeEnvironment(build1.8.0_111-b14)
JavaHotSpot(TM)64-BitServerVM(build25.111-b14,mixedmode)
下载并编译
从我们的某个存储库克隆源代码,例如:
gitclonehttps:github.comapacheflink.git
cdflink
mvncleanpackage-DskipTeststhiswilltakeupto10minute
s
cdbuild-targetthisiswhereFlinkisinstall
edto
启动本地Flink群集
.binstart-cluster.shStartFlink
检查分派器的web前端在HTTP:localhost:8081,并确保一切都正常运行。Web
前端应报告单个可用的TaskManager实例。
本地安装教程
31您还可以通过检查 logs目录中的日志文件来验证系统是否正在运行:
taillogflink--standalonesession-.log
INFO...-Restendpointlisteningatlocalhost:8081
INFO...-http:localhost:8081wasgrantedleadership...
INFO...-Webfrontendlisteningathttp:localhost:8081.
INFO...-StartingRPCendpointforStandaloneResourceManagera
takka:flinkuserresourcemanager.
INFO...-StartingRPCendpointforStandaloneDispatcheratakk
a:flinkuserdispatcher.
INFO...-ResourceManagerakka.tcp:[[emailprotected]](cdn-c
gilemail-protection):6123userresourcemanagerwasgrantedlea
dership...
INFO...-StartingtheSlotManager.
INFO...-Dispatcherakka.tcp:[[emailprotected]](cdn-cgil
email-protection):6123userdispatcherwasgrantedleadership..
.
INFO...-Recoveringallpersistedjobs.
INFO...-RegisteringTaskManager...under...attheSlotMana
ger.
本地安装教程
32阅读代码
您可以在Scala和Java上的GitHub上找到此SocketWindowWordCount示例的完整源
代码。
Scala
Java
objectSocketWindowWordCount{
defmain(args:Array[String]):Unit={
theporttoconnectto
valport:Int=try{
ParameterTool.fromArgs(args).getInt(port)
}catch{
casee:Exception=>{
System.err.println(Noportspecified.Pleaseru
n'SocketWindowWordCount--port')
return
}
}
gettheexecutionenvironment
valenv:StreamExecutionEnvironment=StreamExecutionEnv
ironment.getExecutionEnvironment
getinputdatabyconnectingtothesocket
valtext=env.socketTextStream(localhost,port,'\n')
parsethedata,groupit,windowit,andaggregateth
ecounts
valwindowCounts=text
.flatMap{w=>w.split(\\s)}
.map{w=>WordWithCount(w,1)}
.keyBy(word)
.timeWindow(Time.seconds(5),Time.seconds(1))
.sum(count)
printtheresultswithasinglethread,ratherthani
nparallel
windowCounts.print.setParallelism(1)
env.execute(SocketWindowWordCount)
}
Datatypeforwordswithcount
caseclassWordWithCount(word:String,count:Long)
}
本地安装教程
33publicclassSocketWindowWordCount{
publicstaticvoidmain(String[]args)throwsException{
theporttoconnectto
finalintport;
try{
finalParameterToolparams=ParameterTool.fromArgs(
args);
port=params.getInt(port);
}catch(Exceptione){
System.err.println(Noportspecified.Pleaserun'S
ocketWindowWordCount--port');
return;
}
gettheexecutionenvironment
finalStreamExecutionEnvironmentenv=StreamExecutionEn
vironment.getExecutionEnvironment;
getinputdatabyconnectingtothesocket
DataStreamtext=env.socketTextStream(localhos
t,port,\n);
parsethedata,groupit,windowit,andaggregateth
ecounts
DataStreamwindowCounts=text
.flatMap(newFlatMapFunction{
@Override
publicvoidflatMap(Stringvalue,Collector
WithCount>out){
for(Stringword:value.split(\\s)){
out.collect(newWordWithCount(word,1L));
}
}
})
.keyBy(word)
.timeWindow(Time.seconds(5),Time.seconds(1))
.reduce(newReduceFunction{
@Override
publicWordWithCountreduce(WordWithCounta,Wor
dWithCountb){
returnnewWordWithCount(a.word,a.count+b
.count);
}
});
printtheresultswithasinglethread,ratherthani
nparallel
windowCounts.print.setParallelism(1);
本地安装教程
34env.execute(SocketWindowWordCount);
}
Datatypeforwordswithcount
publicstaticclassWordWithCount{
publicStringword;
publiclongcount;
publicWordWithCount{}
publicWordWithCount(Stringword,longcount){
this.word=word;
this.count=count;
}
@Override
publicStringtoString{
returnword+:+count;
}
}
}
运行示例
现在,我们将运行此Flink应用程序。它将从套接字读取文本,并且每5秒打印一次
前5秒内每个不同单词的出现次数,即处理时间的翻滚窗口,只要文字漂浮在其
中。
首先,我们使用netcat来启动本地服务器
nc-l9000
提交Flink计划:
.binflinkrunexamplesstreamingSocketWindowWordCount.jar-
-port9000
Startingexecutionofprogram
程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:
本地安装教程
35本地安装教程
36单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到 stdout。监
视TaskManager的输出文件并写入一些文本 nc(输入在点击后逐行发送到
Flink):
nc-l9000
loremipsum
ipsumipsumipsum
bye
该 .out文件将在每个时间窗口结束时,只要打印算作字浮在,例如:
tail-flogflink--taskexecutor-.out
lorem:1
bye:1
ipsum:4
要停止Flink当你做类型:
.binstop-cluster.sh
下一步
查看更多示例以更好地了解Flink的编程API。完成后,请继续阅读流处理指南。
本地安装教程
37在Windows上运行Flink
译者:flink.sojb.cn
如果要在Windows计算机上本地运行Flink,则需要下载并解压缩二进制Flink分发。
之后,您可以使用Windows批处理文件( .bat),或使用Cygwin运行Flink
JobManager。
从Windows批处理文件开始
要从Windows命令行启动Flink,请打开命令窗口,导航到 binFlink目录并运
行 start-cluster.bat。
注意: binJavaRuntimeEnvironment的文件夹必须包含在Window
的 %PATH%变量中。按照本指南将Java添加到 %PATH%变量中。
cdflink
cdbin
start-cluster.bat
StartingalocalclusterwithoneJobManagerprocessandoneTas
kManagerprocess.
YoucanterminatetheprocessesviaCTRL-Cinthespawnedshell
windows.
Webinterfacebydefaultonhttp:localhost:8081.
之后,您需要打开第二个终端来运行作业 flink.bat。
从Cygwin和UnixScripts开始
使用Cygwin,您需要启动Cygwin终端,导航到您的Flink目录并运
行 start-cluster.sh脚本:
cdflink
binstart-cluster.sh
Startingcluster.
从Git安装Flink
如果您正在从git存储库安装Flink并且您正在使用Windowsgitshell,则Cygwin可能
会产生类似于以下的故障:
在Windows上运行Flink
38c:flinkbinstart-cluster.sh:line30:figure>\r':commandnot
found
发生此错误是因为在Windows中运行时,git会自动将UNIX行结尾转换为Windows
样式行结尾。问题是Cygwin只能处理UNIX样式的行结尾。解决方案是通过以下三
个步骤调整Cygwin设置以处理正确的行结尾:
1. 启动一个Cygwinshell。
2. 输入确定您的主目录
cd;pwd
ThiswillreturnapathundertheCygwinrootpath.
1. 使用NotePad,写字板或其他文本编辑器打开 .bash_profile主目录中的文
件并附加以下内容:(如果文件不存在,则必须创建它)
exportSHELLOPTS
set-oigncr
保存文件并打开一个新的bashshell。
在Windows上运行Flink
39例子
译者:flink.sojb.cn
捆绑的例子
Flink的来源包括Flink不同API的许多示例:
DataStream应用程序(JavaScala)
DataSet应用程序(JavaScala)
TableAPISQL查询(JavaScala)
这些说明解释了如何运行示例。
Web上的示例
还有一些在线发布的博客文章讨论了示例应用程序:
如何使用ApacheFlink构建有状态流应用程序提供了一个使用DataStreamAPI
实现的事件驱动应用程序和两个用于流分析的SQL查询。
使用ApacheFlink,Elasticsearch和Kibana构建实时仪表板应用程序
是elastic.co的博客文章,展示了如何使用ApacheFlink,Elasticsearch和
Kibana构建用于流数据分析的实时仪表板解决方案。
来自DataArtisans的Flink培训网站有很多例子。查看动手部分和练习。
例子
40批处理示例
译者:flink.sojb.cn
以下示例程序展示了Flink的不同应用程序,从简单的字数统计到图形算法。代码示
例说明了Flink的DataSetAPI的使用。
可以在Flink源存储库的flink-examples-batch模块中找到以下和更多示例的完整源代
码。
运行一个例子
为了运行Flink示例,我们假设您有一个正在运行的Flink实例。导航中的“快速入
门”和“设置”选项卡描述了启动Flink的各种方法。
最简单的方法是运行 .binstart-cluster.sh,默认情况下启动一个带有一个
JobManager和一个TaskManager的本地集群。
Flink的每个二进制版本都包含一个 examples目录,其中包含此页面上每个示例
的jar文件。
要运行WordCount示例,请发出以下命令:
.binflinkrun.examplesbatchWordCount.jar
其他示例可以以类似的方式启动。
请注意,通过使用内置数据,许多示例在不传递任何参数的情况下运行。要使用实
际数据运行WordCount,您必须将路径传递给数据:
.binflinkrun.examplesbatchWordCount.jar--inputpathto
sometextdata--outputpathtoresult
请注意,非本地文件系统需要模式前缀,例如 hdfs:。
字数
WordCount是大数据处理系统的“HelloWorld”。它计算文本集合中单词的频率。该
算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计
数。
Java
Scala
批处理示例
41ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvi
ronment;
DataSettext=env.readTextFile(pathtofile);
DataSet>counts=
splitupthelinesinpairs(2-tuples)containing:(w
ord,1)
text.flatMap(newTokenizer)
groupbythetuplefield0andsumuptuplefield
1
.groupBy(0)
.sum(1);
counts.writeAsCsv(outputPath,\n,);
User-definedfunctions
publicstaticclassTokenizerimplementsFlatMapFunction>{
@Override
publicvoidflatMap(Stringvalue,Collector
nteger>>out){
normalizeandsplittheline
String[]tokens=value.toLowerCase.split(\\W+);
emitthepairs
for(Stringtoken:tokens){
if(token.length>0){
out.collect(newTuple2(token,1));
}
}
}
}
该字计数示例实现上述算法的输入参
数: --input--output。作为测试数据,任何
文本文件都可以。
批处理示例
42valenv=ExecutionEnvironment.getExecutionEnvironment
getinputdatavaltext=env.readTextFile(pathtofile)
valcounts=text.flatMap{_.toLowerCase.split(\\W+)filter{
_.nonEmpty}}
.map{(_,1)}
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath,\n,)
该字计数示例实现上述算法的输入参
数: --input--output。作为测试数据,任何
文本文件都可以。
网页排名
PageRank算法计算链接定义的图形中页面的“重要性”,链接指向一个页面到另一个
页面。它是一种迭代图算法,这意味着它重复应用相同的计算。在每次迭代中,每
个页面在其所有邻居上分配其当前等级,并将其新等级计算为从其邻居接收的等级
的纳税总和。PageRank算法由Google搜索引擎推广,该搜索引擎利用网页的重要
性对搜索查询的结果进行排名。
在这个简单的例子中,PageRank通过批量迭代和固定数量的迭代来实现。
Java
Scala
ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvi
ronment;
readthepagesandinitialranksbyparsingaCSVfile
DataSet>pagesWithRanks=env.readCsvFile(p
agesInputPath)
.types(Long.class,Double.class)
thelinksareencodedasanadjacencylist:(page-id,Array(n
eighbor-ids))
DataSet>pageLinkLists=getLinksDataSet(en
v);
setiterativedataset
IterativeDataSet>iteration=pagesWithRank
s.iterate(maxIterations);
DataSet>newRanks=iteration
joinpageswithoutgoingedgesanddistributerank
批处理示例
43.join(pageLinkLists).where(0).equalTo(0).flatMap(newJoi
nVertexWithEdgesMatch)
collectandsumranks
.groupBy(0).sum(1)
applydampeningfactor
.map(newDampener(DAMPENING_FACTOR,numPages));
DataSet>finalPageRanks=iteration.closeWi
th(
newRanks,newRanks.join(iteration).where(0).equalTo(0)
terminationcondition
.filter(newEpsilonFilter));
finalPageRanks.writeAsCsv(outputPath,\n,);
User-definedfunctions
publicstaticfinalclassJoinVertexWithEdgesMatch
implementsFlatJoinFunction
ble>,Tuple2,Tuple2
>{
@Override
publicvoidjoin(page,Tuple2
ng[]>adj,Collector>out){
Long[]neighbors=adj.f1;
doublerank=page.f1;
doublerankToDistribute=rank((double)neigbors.leng
th);
for(inti=0;i
out.collect(newTuple2(neighbors[i],r
ankToDistribute));
}
}
}
publicstaticfinalclassDampenerimplementsMapFunction
,Tuple2>{
privatefinaldoubledampening,randomJump;
publicDampener(doubledampening,doublenumVertices){
this.dampening=dampening;
this.randomJump=(1-dampening)numVertices;
}
@Override
publicTuple2map(Tuple2value)
{
value.f1=(value.f1dampening)+randomJump;
批处理示例
44returnvalue;
}
}
publicstaticfinalclassEpsilonFilter
implementsFilterFunction
uble>,Tuple2>>{
@Override
publicbooleanfilter(Tuple2,Tuple2
ng,Double>>value){
returnMath.abs(value.f0.f1-value.f1.f1)>EPSILON;
}
}
所述的PageRank程序实现上述实施例。它需要运行以下参
数: --pages--links--output--numPages--iterations。
User-definedtypescaseclassLink(sourceId:Long,targetId:
Long)
caseclassPage(pageId:Long,rank:Double)
caseclassAdjacencyList(sourceId:Long,targetIds:Array[Long])
setupexecutionenvironmentvalenv=ExecutionEnvironment.g
etExecutionEnvironment
readthepagesandinitialranksbyparsingaCSVfilevalpa
ges=env.readCsvFile[Page](pagesInputPath)
thelinksareencodedasanadjacencylist:(page-id,Array(n
eighbor-ids))vallinks=env.readCsvFile[Link](linksInputPath)
assigninitialrankstopagesvalpagesWithRanks=pages.map(
p=>Page(p,1.0numPages))
buildadjacencylistfromlinkinputvaladjacencyLists=lin
ks
initializelists
.map(e=>AdjacencyList(e.sourceId,Array(e.targetId)))
concatenatelists
.groupBy(sourceId).reduce{
(l1,l2)=>AdjacencyList(l1.sourceId,l1.targetIds++l2.targ
etIds)
}
startiterationvalfinalRanks=pagesWithRanks.iterateWithTe
rmination(maxIterations){
currentRanks=>
valnewRanks=currentRanks
distributerankstotargetpages
批处理示例
45.join(adjacencyLists).where(pageId).equalTo(sourceId)
{
(page,adjacent,out:Collector[Page])=>
for(targetId<-adjacent.targetIds){
out.collect(Page(targetId,page.rankadjacent.target
Ids.length))
}
}
collectranksandsumthemup
.groupBy(pageId).aggregate(SUM,rank)
applydampeningfactor
.map{p=>
Page(p.pageId,(p.rankDAMPENING_FACTOR)+((1-DAMPE
NING_FACTOR)numPages))
}
terminateifnorankupdatewassignificant
valtermination=currentRanks.join(newRanks).where(pageId).equalTo(pageId){
(current,next,out:Collector[Int])=>
checkforsignificantupdate
if(math.abs(current.rank-next.rank)>EPSILON)out.co
llect(1)
}
(newRanks,termination)
}
valresult=finalRanks
emitresultresult.writeAsCsv(outputPath,\n,)
hePageRankprogramimplementstheaboveexample.Itrequiresthefollowing
parameterstorun:--pages--links--output--numPages--iterations
.
输入文件是纯文本文件,必须格式如下:
页面表示为由新行字符分隔的(长)ID。
例如, 1\n2\n12\n42\n63\n给出五个页面ID为1,2,12,42和63的页
面。
链接表示为由空格字符分隔的页面ID对。链接由换行符分隔:
例如, 12\n212\n112\n4263\n给出四个(定向)链接(1)-
>(2),(2)->(12),(1)->(12)和(42)->(63)。
对于这个简单的实现,要求每个页面至少有一个传入链接和一个传出链接(页面可
以指向自身)。
连接组件
批处理示例
46连通分量算法通过为同一连接部分中的所有顶点分配相同的组件ID来识别较大图形
的部分。与PageRank类似,ConnectedComponents是一种迭代算法。在每个步
骤中,每个顶点将其当前组件ID传播到其所有邻居。如果顶点小于其自己的组件
ID,则顶点接受来自邻居的组件ID。
此实现使用增量迭代:未更改其组件ID的顶点不参与下一步。这会产生更好的性
能,因为后面的迭代通常只处理一些异常值顶点。
Java
Scala
readvertexandedgedata
DataSetvertices=getVertexDataSet(env);
DataSet>edges=getEdgeDataSet(env).flatMap(
newUndirectEdge);
assigntheinitialcomponentIDs(equaltothevertexID)
DataSet>verticesWithInitialId=vertices.map
(newDuplicateValue);
openadeltaiteration
DeltaIteration,Tuple2>iteration
=
verticesWithInitialId.iterateDelta(verticesWithInitialId
,maxIterations,0);
applythesteplogic:
DataSet>changes=iteration.getWorkset
joinwiththeedges
.join(edges).where(0).equalTo(0).with(newNeighborWithCo
mponentIDJoin)
selecttheminimumneighborcomponentID
.groupBy(0).aggregate(Aggregations.MIN,1)
updateifthecomponentIDofthecandidateissmalle
r
.join(iteration.getSolutionSet).where(0).equalTo(0)
.flatMap(newComponentIdFilter);
closethedeltaiteration(deltaandnewworksetareidentica
l)
DataSet>result=iteration.closeWith(changes
,changes);
emitresult
result.writeAsCsv(outputPath,\n,);
User-definedfunctions
publicstaticfinalclassDuplicateValueimplementsMapFuncti
on>{
@Override
批处理示例
47publicTuple2map(Tvertex){
returnnewTuple2(vertex,vertex);
}
}
publicstaticfinalclassUndirectEdge
implementsFlatMapFunction
>,Tuple2>{
Tuple2invertedEdge=newTuple2;
@Override
publicvoidflatMap(Tuple2edge,Collector
2>out){
invertedEdge.f0=edge.f1;
invertedEdge.f1=edge.f0;
out.collect(edge);
out.collect(invertedEdge);
}
}
publicstaticfinalclassNeighborWithComponentIDJoin
implementsJoinFunction,Tupl
e2,Tuple2>{
@Override
publicTuple2join(Tuple2vertexWith
Component,Tuple2edge){
returnnewTuple2(edge.f1,vertexWithCompone
nt.f1);
}
}
publicstaticfinalclassComponentIdFilter
implementsFlatMapFunction
g,Long>,Tuple2>,Tuple2>
{
@Override
publicvoidflatMap(Tuple2,Tuple2>value,Collector>out){
if(value.f0.f1
out.collect(value.f0);
}
}
}
该ConnectedComponents程序实现上述实施例。它需要运行以下参
数: --vertices--edges--output--iterations。
批处理示例
48setupexecutionenvironmentvalenv=ExecutionEnvironment.g
etExecutionEnvironment
readvertexandedgedata
assigntheinitialcomponents(equaltothevertexid)valve
rtices=getVerticesDataSet(env).map{id=>(id,id)}
undirectededgesbyemittingforeachinputedgetheinputed
gesitselfandaninverted
versionvaledges=getEdgesDataSet(env).flatMap{edge=>Se
q(edge,(edge._2,edge._1))}
openadeltaiterationvalverticesWithComponents=vertices.
iterateDelta(vertices,maxIterations,Array(0)){
(s,ws)=>
applythesteplogic:joinwiththeedges
valallNeighbors=ws.join(edges).where(0).equalTo(0){(ver
tex,edge)=>
(edge._2,vertex._2)
}
selecttheminimumneighbor
valminNeighbors=allNeighbors.groupBy(0).min(1)
updateifthecomponentofthecandidateissmaller
valupdatedComponents=minNeighbors.join(s).where(0).equalT
o(0){
(newVertex,oldVertex,out:Collector[(Long,Long)])=>
if(newVertex._2
}
deltaandnewworksetareidentical
(updatedComponents,updatedComponents)
}
verticesWithComponents.writeAsCsv(outputPath,\n,)
这个PageRank程序实现了上面的例子。它需要运行以下参
数: --pages--links--output--numPages--iterations。
输入文件是纯文本文件,必须格式如下:
顶点表示为ID并用换行符分隔。
例如, 1\n2\n12\n42\n63\n给出五个顶点(1),(2),(12),(42)和(63)。
边缘表示为由空格字符分隔的顶点ID的对。边线由换行符分隔:
例如, 12\n212\n112\n4263\n给出四个(无向)链路(1)-
(2),(2)-(12),(1)-(12)和(42)-(63)。
批处理示例
49批处理示例
50应用开发
应用开发
51项目构建设置
项目构建设置
52Java项目模板
译者:flink.sojb.cn
只需几个简单的步骤即可开始使用FlinkJava程序。
要求
唯一的要求是使用Maven3.0.4(或更高版本)和Java8.x安装。
创建项目
使用以下命令之一创建项目:
使用Maven原型
运行快速入门脚本
mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeCatalog=https:repository.apache.orgcontentr
epositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT
这允许您命名新创建的项目。它将以交互方式询问您groupId,artifactId和包名称。
curlhttps:flink.apache.orgqquickstart-SNAPSHOT.sh|bash
-s1.7-SNAPSHOT
注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-
DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加
存储库条目。有关此更改的详细信息,请参阅Maven官方文档
检查项目
您的工作目录中将有一个新目录。如果您使用了curl方法,则会调用该目
录 quickstart。否则,它的名称为 artifactId:
Java项目模板
53treequickstart
quickstart
├──pom.xml
└──src
└──main
├──java
│└──org
│└──myorg
│└──quickstart
│├──BatchJob.java
│└──StreamingJob.java
└──resources
└──log4j.properties
示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob
是DataStream和DataSet程序的基本框架程序。的主要方法是程序的入口点,无论
是对在-IDE测试执行并作适当的部署。
我们建议您将此项目导入IDE以进行开发和测试。IntelliJIDEA支持开箱即用的
Maven项目。如果您使用Eclipse,则m2e插件允许导入Maven项目。某些Eclipse
包默认包含该插件,其他包需要您手动安装它。
MacOSX用户注意事项:对于Flink,Java默认JVM堆可能太小。你必须手动增加
它。在Eclipse中,选择 RunConfigurations->Arguments并写
入 VMArguments框: -Xmx800m。在IntelliJIDEA中,推荐的方法是
从 Help|EditCustomVMOptions菜单中更改JVM选项。有关详细信息,请
参阅此文
构建项目
如果要构建打包项目,请转到项目目录并运行' mvncleanpackage'命令。您将
找到一个包含您的应用程序的JAR文件,以及您可能已作为依赖项添加到应用程序
的连接器和库: target-.jar。
注意:如果您使用与StreamingJob不同的类作为应用程序的主类入口点,我们建
议您相应地更改文件中的 mainClass设置 pom.xml。这样,Flink可以从JAR文
件运行时间应用程序,而无需另外指定主类。
下一步
写你的申请!
如果您正在编写流处理应用程序并且正在寻找灵感来写什么,请查看流处理应用程
序教程。
如果您正在编写批处理应用程序,并且正在寻找要编写的内容,请查看批处理应用
程序示例。
Java项目模板
54有关API的完整概述,请查看DataStreamAPI和DataSetAPI部分。
在这里,您可以了解如何在本地群集上的IDE外部运行应用程序。
如果您有任何问题,请在我们的邮件列表中查询。我们很乐意提供帮助。
Java项目模板
55Scala的项目模板
译者:flink.sojb.cn
构建工具
Flink项目可以使用不同的构建工具构建。为了快速入门,Flink为以下构建工具提供
了项目模板:
SBT
Maven
这些模板可帮助您设置项目结构并创建初始构建文件。
SBT
创建项目
您可以通过以下两种方法之一构建新项目:
使用sbt模板
运行快速入门脚本
sbtnewtillrohrmannflink-project.g8
这将提示您输入几个参数(项目名称,flink版本...),然后从flink-project模板创建
一个Flink项目。您需要sbt>=0.13.13才能执行此命令。如有必要,您可以按照此
安装指南获取。
bash<(curlhttps:flink.apache.orgqsbt-quickstart.sh)
ThiswillcreateaFlinkprojectinthespecifiedprojectdirectory.
构建项目
要构建项目,您只需发出 sbtcleanassembly命令即可。这将在target
scala_your-major-scala-version目录中创建fat-jaryour-project-name-
assembly-0.1-SNAPSHOT.jar。
运行项目
Scala的项目模板
56要运行项目,您必须发出 sbtrun命令。
默认情况下,这将在运行的同一JVM中运行您的作业 sbt。要在不同的JVM中运
行您的作业,请添加以下行 build.sbt
forkinrun:=true
的IntelliJ
我们建议您使用IntelliJ进行Flink作业开发。要开始,您必须将新创建的项目导入
IntelliJ。您可以通
过 File->New->ProjectfromExistingSources...然后选择项目
目录来执行此算子操作。然后,IntelliJ将自动检测 build.sbt文件并设置所有内
容。
为了运行Flink作业,建议选择 mainRunner模块作为运行调试配置的类路径。这
将确保所有设置为提供的依赖项在执行时可用。您可以配置运行调试配置通
过 Run->EditConfigurations...,然后选择 mainRunner从模块的使
用类路径的Dropbox。
Eclipse
要将新创建的项目导入Eclipse,首先必须为其创建Eclipse项目文件。这些项目文
件可以通过sbteclipse插件创建。将以下行添加到您
的 PROJECT_DIRprojectplugins.sbt文件中:
addSbtPlugin(com.typesafe.sbteclipse%sbteclipse-plugin%
4.0.0)
在 sbt使用以下命令创建Eclipse项目文件
>eclipse
现在,您可以通过项目将项目导入Eclipse
File->Import...->ExistingProjectsintoWorkspace,然后
选择项目目录。
Maven
要求
唯一的要求是使用Maven3.0.4(或更高版本)和Java8.x安装。
Scala的项目模板
57创建项目
使用以下命令之一创建项目:
使用Maven原型
运行快速入门脚本
mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeCatalog=https:repository.apache.orgcontentr
epositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT
这允许您命名新创建的项目。它将以交互方式询问您groupId,artifactId和包名称。
curlhttps:flink.apache.orgqquickstart-scala-SNAPSHOT.sh
|bash-s1.7-SNAPSHOT
注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-
DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加
存储库条目。有关此更改的详细信息,请参阅Maven官方文档
检查项目
您的工作目录中将有一个新目录。如果您使用了curl方法,则会调用该目
录 quickstart。否则,它的名称为 artifactId:
treequickstart
quickstart
├──pom.xml
└──src
└──main
├──resources
│└──log4j.properties
└──scala
└──org
└──myorg
└──quickstart
├──BatchJob.scala
└──StreamingJob.scala
示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob
是DataStream和DataSet程序的基本框架程序。的主要方法是程序的入口点,无论
是对在-IDE测试执行并作适当的部署。
Scala的项目模板
58我们建议您将此项目导入IDE。
IntelliJIDEA支持Maven开箱即用,并为Scala开发提供插件。根据我们的经验,IntelliJ为开发Flink应用程序提供了最佳体验。
对于Eclipse,您需要以下插件,您可以从提供的EclipseUpdateSites安装这些插
件:
Eclipse4.x
ScalaIDE
m2eclipse的-Scala
构建HelperMaven插件
Eclipse3.8
用于Scala2.11的ScalaIDE或用于Scala2.10的ScalaIDE
m2eclipse的-Scala
构建HelperMaven插件
构建项目
如果要构建打包项目,请转到项目目录并运行' mvncleanpackage'命令。您将
找到一个包含您的应用程序的JAR文件,以及您可能已作为依赖项添加到应用程序
的连接器和库: target-.jar。
注意:如果您使用与StreamingJob不同的类作为应用程序的主类入口点,我们建
议您相应地更改文件中的 mainClass设置 pom.xml。这样,Flink可以从JAR文
件运行时间应用程序,而无需另外指定主类。
下一步
写你的申请!
如果您正在编写流处理应用程序并且正在寻找灵感来写什么,请查看流处理应用程
序教程
如果您正在编写批处理应用程序,并且正在寻找要编写的内容,请查看批处理应用
程序示例
有关API的完整概述,请查看DataStreamAPI和DataSetAPI部分。
在这里,您可以了解如何在本地群集上的IDE外部运行应用程序。
如果您有任何问题,请在我们的邮件列表中查询。我们很乐意提供帮助。
Scala的项目模板
59配置依赖关系,连接器,库
译者:flink.sojb.cn
每个Flink应用程序都依赖于一组Flink库。至少,应用程序依赖于FlinkAPI。许多应
用程序还依赖于某些连接器库(如Kafka,Cassandra等)。运行Flink应用程序时
(在分布式部署中或在IDE中进行测试),Flink运行时库也必须可用。
Flink核心和应用程序依赖项
与大多数运行用户定义应用程序的系统一样,Flink中有两大类依赖项和库:
Flink核心依赖关系:Flink本身由运行系统所需的一组类和依赖项组成,例如
协调,网络,检查点,故障转移,API,算子操作(如窗口),资源管理等。
这些类和依赖项构成了Flink运行时的核心,在启动Flink应用程序时必须存在。
这些核心类和依赖项打包在 flink-distjar中。它们是Flink lib文件夹的
一部分,是Flink基本容器镜像的一部分。想象成类似于Java核心库(这些依赖
关系 rt.jar, charsets.jar等等),其中包含像类 String和 List。
Flink核心依赖项不包含任何连接器或库(CEP,SQL,ML等),以避免默认
情况下在类路径中具有过多的依赖项和类。实际上,我们尝试尽可能保持核心
依赖关系,以保持默认类路径较小并避免依赖性冲突。
所述用户应用程序的依赖关系是所有的连接器,格式或库,一个特定的用户应
用需求。
用户应用程序通常打包到应用程序jar中,该应用程序jar包含应用程序代码以及
所需的连接器和库依赖项。
用户应用程序依赖项显式不包括FlinkDataSetDataStreamAPI和运行时依赖
项,因为它们已经是Flink核心依赖项的一部分。
设置项目:基本依赖项
每个Flink应用程序都需要最低限度的API依赖关系来进行开发。对于Maven,您可
以使用Java项目模板或Scala项目模板来创建具有这些初始依赖项的程序框架。
手动设置项目时,需要为JavaScalaAPI添加以下依赖项(此处以Maven语法显
示,但相同的依赖项也适用于其他构建工具(Gradle,SBT等)。
Java
Scala
配置依赖关系,连接器,库
60
org.apache.flink
flink-java
1.7-SNAPSHOT
provided
org.apache.flink
flink-streaming-java_2.11
1.7-SNAPSHOT
provided
org.apache.flink
flink-scala_2.11
1.7-SNAPSHOT
provided
org.apache.flink
flink-streaming-scala_2.11
1.7-SNAPSHOT
provided
重要提示:请注意,所有这些依赖项都将其范围设置为提供。这意味着需要对它们
进行编译,但不应将它们打包到项目生成的应用程序jar文件中-这些依赖项是Flink
CoreDependencies,它们已在任何设置中提供。
强烈建议将依赖关系保持在提供的范围内。如果它们未设置为提供,则最好的情况
是生成的JAR变得过大,因为它还包含所有Flink核心依赖项。最糟糕的情况是添加
到应用程序的jar文件的Flink核心依赖项与您自己的一些依赖版本冲突(通常通过反
向类加载来避免)。
关于IntelliJ的注意事项:要使应用程序在IntelliJIDEA中运行,需要在范围编译中
声明Flink依赖项,而不是提供。否则,IntelliJ不会将它们添加到类路径中,并且in-
IDE执行将失败并带有 NoClassDefFountError。为了避免必须将依赖范围声明
为compile(不推荐使用,请参见上文),上面链接的Java和Scala项目模板使用了
一个技巧:它们添加了一个配置文件,该应用程序在IntelliJ中运行时有选择地激活
在不影响JAR文件打包的情况下,将依赖关系提升到范围编译。
添加连接器和库依赖项
大多数应用程序需要运行特定的连接器或库,例如连接到Kafka,Cassandra等的连
接器。这些连接器不是Flink的核心依赖项的一部分,因此必须作为依赖项添加到应
用程序中
配置依赖关系,连接器,库
61下面是将Kafka0.10的连接器添加为依赖项(Maven语法)的示例:
org.apache.flink
flink-connector-kafka-0.10_2.11
1.7-SNAPSHOT
我们建议将应用程序代码及其所有必需的依赖项打包到一个jar-with-dependencies
中,我们将其称为应用程序jar。应用程序jar可以提交给已经运行的Flink集群,也可
以添加到Flink应用程序容器镜像中。
从Java项目模板或Scala项目模板创建的项目配置为在运行时自动将应用程序依赖
项包含到应用程序jar中 mvncleanpackage。对于未从这些模板设置的项目,我
们建议添加MavenShade插件(如下面的附录中所列)以构建具有所有必需依赖项
的应用程序jar。
重要:对于Maven(和其他构建工具)将依赖项正确打包到应用程序jar中,必须在
范围编译中指定这些应用程序依赖项(与核心依赖项不同,核心依赖项必须在提供
的作用域中指定)。
Scala版本
Scala版本(2.10,2.11,2.12等)彼此不是二进制兼容的。因此,FalaforScala2.11
不能与使用Scala2.12的应用程序一起使用。
例如,所有(传递上)依赖于Scala的Flink依赖项都以它们为其构建的Scala版本为
后缀 flink-streaming-scala_2.11。
只使用Java开发人员可以选择任何Scala版本,Scala开发人员需要选择与其应用程
序的Scala版本匹配的Scala版本。
有关如何为特定Scala版本构建Flink的详细信息,请参阅构建指南。
注意:由于Scala2.12中的重大更改,Flink1.5目前仅针对Scala2.11构建。我们的
目标是在下一版本中添加对Scala2.12的支持。
Hadoop依赖项
一般规则:永远不必将Hadoop依赖项直接添加到您的应用程序中。(唯一的例外
是当使用现有的Hadoop输入输出格式与Flink的Hadoop兼容打包器时)
如果要将Flink与Hadoop一起使用,则需要具有包含Hadoop依赖关系的Flink设置,而不是将Hadoop添加为应用程序依赖关系。有关详细信息,请参阅Hadoop设置指
南。
该设计有两个主要原因:
配置依赖关系,连接器,库
62一些Hadoop交互发生在Flink的核心,可能在用户应用程序启动之前,例如为
检查点设置HDFS,通过Hadoop的Kerberos令牌进行身份验证或在YARN上部
署。
Flink的反向类加载方法隐藏了核心依赖关系中的许多传递依赖关系。这不仅适
用于Flink自己的核心依赖项,也适用于Hadoop在设置中存在的依赖项。这
样,应用程序可以使用相同依赖项的不同版本,而不会遇到依赖冲突(并且相
信我们,这是一个大问题,因为Hadoops依赖树很大。)
如果在IDE内部的测试或开发过程中需要Hadoop依赖关系(例如,用于HDFS访
问),请配置这些依赖关系,类似于要测试或提供的依赖关系的范围。
附录:用于构建具有依赖关系的Jar的模板
要构建包含声明的连接器和库所需的所有依赖项的应用程序JAR,可以使用以下
shade插件定义:
配置依赖关系,连接器,库
63
org.apache.maven.plugins
maven-shade-plugin
3.0.0
package
shade
com.google.code.findbug
s:jsr305
org.slf4j:
log4j:
:
META-INF.SF
ude>
META-INF.DSA
lude>
META-INF.RSA
lude>
che.maven.plugins.shade.resource.ManifestResourceTransformer>
my.programs.main.claz
z
配置依赖关系,连接器,库
64配置依赖关系,连接器,库
65基本API概念
译者:flink.sojb.cn
Flink程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据
写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序可以在各种环境
中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多
计算机的集群上执行。
根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中
DataSetAPI用于批处理,DataStreamAPI用于流式处理。本指南将介绍两种API共
有的基本概念,但请参阅我们的流处理指南和批处理指南,了解有关使用每个API
编写程序的具体信息。
注:当显示的API时,如何使用,我们将用实际的例子
StreamingExecutionEnvironment和 DataStreamAPI。 DataSetAPI中的概
念完全相同,只需替换为 ExecutionEnvironment和 DataSet。
DataSet和DataStream
Flink具有特殊类 DataSet并 DataStream在程序中表示数据。您可以将它们视为
可以包含重复项的不可变数据集合。在 DataSet数据有限的情况下,对于一
个 DataStream数据元的数量可以是无界的。
这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着
一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。
集合最初通过在Flink程序添加源创建和新的集合从这些通过将它们使用API方法如
衍生 map, filter等等。
Flink计划的剖析
Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:
1. 获得一个 executionenvironment,2. 加载创建初始数据,3. 指定此数据的转换,4. 指定放置计算结果的位置,5. 触发程序执行
6. Java
7. Scala
基础API概念
66我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java
DataSetAPI的所有核心类都可以在org.apache.flink.api.java包中找到,而Java
DataStreamAPI的类可以在org.apache.flink.streaming.api中找到。
这 StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态
方法获取一个 StreamExecutionEnvironment:
getExecutionEnvironment
createLocalEnvironment
createRemoteEnvironment(Stringhost,intport,String...jarFile
s)
通常,您只需要使用 getExecutionEnvironment,因为这将根据上下文做正
确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环
境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通
过命令行调用它,则Flink集群管理器将执行您的main方法
并 getExecutionEnvironment返回一个运行环境,以便在集群上执行您的程
序。
对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐
行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列
行读取,您可以使用:
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironmen
t.getExecutionEnvironment;
DataStreamtext=env.readTextFile(file:pathtofile
);
这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生
DataStream。
您可以通过使用转换函数调用DataStream上的方法来应用转换。例如,Map转换如
下所示:
DataStreaminput=...;
DataStreamparsed=input.map(newMapFunction
nteger>{
@Override
publicIntegermap(Stringvalue){
returnInteger.parseInt(value);
}
});
基础API概念
67这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。
这些只是创建接收器的一些示例方法:
writeAsText(Stringpath)
print
我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala
DataSetAPI的所有核心类都可以在org.apache.flink.api.scala包中找到,而Scala
DataStreamAPI的类可以在org.apache.flink.streaming.api.scala中找到。
这 StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态
方法获取一个 StreamExecutionEnvironment:
getExecutionEnvironment
createLocalEnvironment
createRemoteEnvironment(host:String,port:Int,jarFiles:Strin
g)
通常,您只需要使用 getExecutionEnvironment,因为这将根据上下文做正
确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环
境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通
过命令行调用它,则Flink集群管理器将执行您的main方法
并 getExecutionEnvironment返回一个运行环境,以便在集群上执行您的程
序。
对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐
行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列
行读取,您可以使用:
valenv=StreamExecutionEnvironment.getExecutionEnvironment
valtext:DataStream[String]=env.readTextFile(file:pathto
file)
这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生
DataStream。
您可以通过使用转换函数调用DataSet上的方法来应用转换。例如,Map转换如下
所示:
基础API概念
68valinput:DataSet[String]=...
valmapped=input.map{x=>x.toInt}
这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。
这些只是创建接收器的一些示例方法:
writeAsText(path:String)
print
一旦您指定的完整程序,你需要触发执行程序调用
execute上 StreamExecutionEnvironment。根据执行的类
型, ExecutionEnvironment将在本地计算机上触发执行或提交程序以在群集上
执行。
该 execute方法返回一个 JobExecutionResult,包含执行时间和累加器结
果。
有关流数据源和接收器的信息,请参阅流指南,以及有关DataStream上支持的转换
的更深入信息。
有关批处理数据源和接收器的信息,请查看批处理指南,以及有关DataSet支持的
转换的更深入信息。
懒惰的评价
所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直
接发生。而是创建每个算子操作并将其添加到程序的计划中。当 execute运行
环境上的调用显式触发执行时,实际执行算子操作。程序是在本地执行还是在集群
上执行取决于运行环境的类型
懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。
指定Keys
某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他
转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在
Keys上分组。
DataSet被分组为
基础API概念
69DataSet<...>input=[...]
DataSet<...>reduced=input
.groupBy(definekeyhere)
.reduceGroup(dosomething);
虽然可以使用DataStream指定Keys
DataStream<...>input=[...]
DataStream<...>windowed=input
.keyBy(definekeyhere)
.window(windowspecification);
Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。
键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。
注意:在下面的讨论中,我们将使用 DataStreamAPI和 keyBy。对于DataSet
API,您只需要替换为 DataSet和 groupBy。
定义元组的键
最简单的情况是在元组的一个或多个字段上对元组进行分组:
Java
Scala
DataStream>input=[...]
KeyedStream,Tuple>keyed=input.key
By(0)
valinput:DataStream[(Int,String,Long)]=[...]valkeyed
=input.keyBy(0)
元组在第一个字段(整数类型)上分组。
Java
Scala
DataStream>input=[...]
KeyedStream,Tuple>keyed=input.key
By(0,1)
基础API概念
70valinput:DataSet[(Int,String,Long)]=[...]valgrouped=
input.groupBy(0,1)
在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。
关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:
DataStream,String,Long>>ds;
指定 keyBy(0)将导致系统使用full Tuple2作为键(以Integer和Float为键)。
如果要“导航”到嵌套中 Tuple2,则必须使用下面解释的字段表达式键。
使用FieldExpressions定义键
您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连
接或coGrouping的键。
字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类
型。
Java
Scala
在下面的示例中,我们有一个 WCPOJO,其中包含两个字段“word”和“count”。要
按字段分组 word,我们只需将其名称传递给 keyBy函数即可。
someordinaryPOJO(PlainoldJavaObject)
publicclassWC{
publicStringword;
publicintcount;
}
DataStreamwords=[...]
1.2
1.2.1
1.2.2
1.3
1.3.1
1.3.1.1
1.3.2
1.3.2.1
1.3.2.2
1.4
1.4.1
1.5
1.5.1
1.5.1.1
1.5.1.2
1.5.1.3
1.5.2
1.5.2.1
1.5.2.2
1.5.3
1.5.3.1
1.5.3.1.1
1.5.3.1.2
1.5.3.2
1.5.3.2.1
1.5.3.2.2
1.5.3.2.3
1.5.3.2.4
1.5.3.2.5
1.5.3.2.6
1.5.3.3
1.5.3.3.1
1.5.3.3.2
1.5.3.3.3
目錄
ApacheFlink文档
概念
数据流编程模型
分布式运行时环境
教程
API教程
DataStreamAPI教程
Setup教程
本地安装教程
在Windows上运行Flink
例子
批处理示例
应用开发
项目构建设置
Java项目模板
Scala的项目模板
配置依赖关系,连接器,库
基础API概念
ScalaAPI扩展
JavaLambda表达式
FlinkDataStreamAPI编程指南
活动时间
生成时间戳水印
预定义的时间戳提取器水印发射器
状态与容错
状态运行
广播状态模式
检查点
可查询状态Beta
状态后台
管理状态的自定义序列化
算子
视窗
Join
过程函数(低级算子操作)
11.5.3.3.4
1.5.3.4
1.5.3.4.1
1.5.3.4.2
1.5.3.4.3
1.5.3.4.4
1.5.3.4.5
1.5.3.4.6
1.5.3.4.7
1.5.3.4.8
1.5.3.4.9
1.5.3.4.10
1.5.3.5
1.5.3.6
1.5.3.7
1.5.3.8
1.5.4
1.5.4.1
1.5.4.2
1.5.4.3
1.5.4.4
1.5.4.5
1.5.4.6
1.5.4.7
1.5.4.8
1.5.4.9
1.5.5
1.5.5.1
1.5.5.2
1.5.5.3
1.5.5.4
1.5.5.5
1.5.5.6
1.5.5.7
1.5.5.8
1.5.5.9
1.5.6
1.5.6.1
1.5.7
外部数据访问的异步IO.
流连接器
数据源和接收器的容错保证
ApacheKafka连接器
ApacheCassandra连接器
亚马逊AWSKinesisStreams连接器
Elasticsearch连接器
HDFS连接器
流文件接收器
RabbitMQ连接器
ApacheNiFi连接器
Twitter连接器
旁路输出
Python编程指南(流)Beta
测试
实验特点
FlinkDataSetAPI编程指南
数据集转换
容错
在数据集中压缩数据元
迭代
Python编程指南Beta
连接器
Hadoop兼容性测试版
本地执行
群集执行
TableAPI和SQL
概念和通用API
流处理概念
连接到外部系统
TableAPI
SQL
内置函数
用户定义的源和接收器
用户定义的函数
SQL客户端测试版
数据类型和序列化
为Flink程序注册自定义序列化程序
管理执行
21.5.7.1
1.5.7.2
1.5.7.3
1.5.7.4
1.5.7.5
1.5.8
1.5.8.1
1.5.8.2
1.5.8.3
1.5.8.4
1.5.8.4.1
1.5.8.4.2
1.5.8.4.3
1.5.8.4.4
1.5.8.4.5
1.5.8.5
1.5.8.5.1
1.5.8.5.2
1.5.8.5.3
1.5.8.5.4
1.5.8.5.5
1.5.8.5.6
1.5.8.5.7
1.5.8.5.8
1.5.8.5.9
1.5.8.5.10
1.5.8.5.11
1.5.8.5.12
1.5.8.5.13
1.5.9
1.5.10
1.6
1.6.1
1.6.1.1
1.6.1.2
1.6.1.3
1.6.1.4
1.6.1.5
1.6.1.6
执行配置
程序打包和分布式执行
并行执行
执行计划
重启策略
类库
FlinkCEP-Flink的复杂事件处理
风暴兼容性Beta
Gelly:FlinkGraphAPI
图API
迭代图处理
类库方法
图算法
图形生成器
二分图
FlinkML-Flink的机器学习
快速入门指南
如何贡献
交叉验证
DistanceMetrics
k-NearestNeighbors关联
MinMaxScaler
MultipleLinearRegression
在管道的引擎盖下看
PolynomialFeatures
随机异常值选择
StandardScaler
AlternatingLeastSquares
SVMusingCoCoA
最佳实践
API迁移指南
部署和运营
集群和部署
独立群集
YARN设置
Mesos设置
Kubernetes设置
Docker设置
亚马逊网络服务(AWS)
31.6.1.7
1.6.1.8
1.6.1.9
1.6.2
1.6.3
1.6.3.1
1.6.3.2
1.6.3.3
1.6.3.4
1.6.4
1.6.5
1.6.6
1.6.7
1.6.8
1.6.9
1.6.10
1.6.11
1.7
1.7.1
1.7.2
1.7.3
1.7.4
1.7.5
1.7.6
1.7.7
1.7.8
1.7.9
1.8
1.8.1
1.8.2
1.9
1.9.1
1.9.2
1.9.3
1.9.4
1.9.5
GoogleComputeEngine设置
MapR设置
Hadoop集成
JobManager高可用性(HA)
状态和容错
检查点
保存点
状态后台
调整检查点和大状态
配置
生产准备清单
命令行界面
ScalaREPL
Kerberos身份验证设置和配置
SSL设置
文件系统
升级应用程序和Flink版本
调试和监控
度量
如何使用日志记录
历史服务器
监控检查点
监测背压
监控RESTAPI
调试Windows和事件时间
调试类加载
应用程序分析
FlinkDevelopment
将Flink导入IDE
从Source建立Flink
内幕
组件堆栈
数据流容错
工作和调度
任务生命周期
文件系统
45ApacheFlink文档
译者:flink.sojb.cn
在线阅读
PDF格式
EPUB格式
MOBI格式
代码仓库
本文档适用于ApacheFlink1.7-SNAPSHOT版。这些页面的建立时间为:
090818,中部标准时间07:53:00。
ApacheFlink是一个用于分布式流和批处理数据处理的开源平台。Flink的核心是流
数据流引擎,为数据流上的分布式计算提供数据分发,通信和容错。Flink在流引擎
之上构建批处理,覆盖本机迭代支持,托管内存和程序优化。
第一步
概念:从Flink的数据流编程模型和分布式运行时环境的基本概念开始。这将有
助于您了解文档的其他部分,包括设置和编程指南。我们建议您先阅读这些部
分。
教程:
实现并运行DataStream应用程序
设置本地Flink群集
编程指南:您可以阅读我们关于基本API概念和DataStreamAPI或DataSetAPI
的指南,以了解如何编写您的第一个Flink程序。
部署
在将Flink工作投入生产之前,请阅读生产准备清单。
发行说明
发行说明涵盖了Flink版本之间的重要更改。如果您计划将Flink设置升级到更高版
本,请仔细阅读这些说明。
Flink1.6发行说明。
Flink1.5发行说明。
外部资源
ApacheFlink文档
6FlinkForward:FlinkForward网站和YouTube上提供了以往会议的讲座。使
用ApacheFlink进行强大的流处理是一个很好的起点。
培训:数据工匠的培训材料包括幻灯片,练习和示例解决方案。
博客:ApacheFlink和数据工匠博客发布了有关Flink的频繁,深入的技术文
章。
ApacheFlink文档
7概念
概念
8数据流编程模型
译者:flink.sojb.cn
抽象层次
Flink提供不同级别的抽象来开发流批处理应用程序。
最低级抽象只提供有状态流。它通过ProcessFunction嵌入到DataStreamAPI
中。它允许用户自由处理来自一个或多个流的事件,并使用一致的容错状态。
此外,用户可以注册事件时间和处理时间回调,允许程序实现复杂的计算。
实际上,大多数应用程序不需要上述低级抽象,而是针对CoreAPI编程,如
DataStreamAPI(有界无界流)和DataSetAPI(有界数据集)。这些流畅的
API提供了用于数据处理的通用构建块,例如各种形式的用户指定的转换,连
接,聚合,窗口,状态等。在这些API中处理的数据类型在相应的编程语言中
表示为类。
低级ProcessFunction与DataStreamAPI集成,因此只能对某些算子操作进行
低级抽象。该数据集API提供的有限数据集的其他原语,如循环迭代。
该TableAPI是为中心的声明性DSL表,其可被动态地改变的表(表示流
时)。该TableAPI遵循(扩展)关系模型:表有一个模式连接(类似于在关
系数据库中的表)和API提供可比的算子操作,如选择,项目,连接,分组依
据,聚合等TableAPI程序以声明方式定义应该执行的逻辑算子操作,而不是
准确指定算子操作代码的外观。虽然TableAPI可以通过各种类型的用户定义
函数进行扩展,但它的表现力不如CoreAPI,但使用更简洁(编写的代码更
少)。此外,TableAPI程序还会通过优化程序,在执行之前应用优化规则。
可以在表和DataStreamDataSet之间无缝转换,允许程序混合TableAPI以及
DataStream和DataSetAPI。
数据流编程模型
9Flink提供的最高级抽象是SQL。这种抽象在语义和表达方面类似于Table
API,但是将程序表示为SQL查询表达式。在SQL抽象与TableAPI紧密地相互
作用,和SQL查询可以通过定义表来执行TableAPI。
程序和数据流
Flink程序的基本构建块是流和转换。(请注意,Flink的DataSetAPI中使用的
DataSet也是内部流-稍后会详细介绍。)从概念上讲,流是(可能永无止境的)
数据记录流,而转换是将一个或多个流作为一个或多个流的算子操作。输入,并产
生一个或多个输出流。
执行时,Flink程序映射到流数据流,由流和转换算子组成。每个数据流都以一个或
多个源开头,并以一个或多个接收器结束。数据流类似于任意有向无环图
(DAG)。尽管通过迭代结构允许特殊形式的循环,但为了简单起见,我们将在
大多数情况下对此进行掩饰。
通常,程序中的转换与数据流中的算子之间存在一对一的对应关系。但是,有时一
个转换可能包含多个转换算子。
源流和接收器记录在流连接器和批处理连接器文档中。DataStream算子和DataSet
转换中记录了转换。
数据流编程模型
10并行数据流
Flink中的程序本质上是并行和分布式的。在执行期间,流具有一个或多个流分区,并且每个算子具有一个或多个算子子任务。算子子任务彼此独立,并且可以在不
同的线程中执行,并且可能在不同的机器或容器上执行。
算子子任务的数量是该特定算子的并行度。流的并行性始终是其生成算子的并行
性。同一程序的不同算子可能具有不同的并行级别。
流可以以一对一(或转发)模式或以重新分发模式在两个算子之间传输数据:
一对一流(例如,在上图中的Source和map算子之间)保存数据元的分区
和排序。这意味着map算子的subtask[1]将以与Source算子的subtask
[1]生成的顺序相同的顺序看到相同的数据元。
重新分配流(在上面的map和keyBywindow之间,以及keyBywindow
和Sink之间)重新分配流。每个算子子任务将数据发送到不同的目标子任务,具体取决于所选的转换。实例是keyBy(其通过散列Keys重新分区),广播,或Rebalance(其重新分区随机地)。在重新分配交换中,数据元之间的排序仅保存在每对发送和接收子任务中(例如,map的子任
务[1]和子任务[2]keyBywindow)。因此,在此示例中,保存了每个Keys内
的排序,但并行性确实引入了关于不同Keys的聚合结果到达接收器的顺序的非
确定性。
有关配置和控制并行性的详细信息,请参阅并行执行的文档。
数据流编程模型
11窗口
聚合事件(例如,计数,总和)在流上的工作方式与批处理方式不同。例如,不可
能计算流中的所有数据元,因为流通常是无限的(无界)。相反,流上的聚合(计
数,总和等)由窗口限定,例如“在最后5分钟内计数”或“最后100个数据元的总
和”。
Windows可以是时间驱动的(例如:每30秒)或数据驱动(例如:每100个数据
元)。一个典型地区分不同类型的窗口,例如翻滚窗口(没有重叠),滑动窗口
(具有重叠)和会话窗口(由不活动的间隙打断)。
更多窗口示例可以在此博客文章中找到。更多详细信息在窗口文档中。
时间
当在流程序中引用时间(例如定义窗口)时,可以参考不同的时间概念:
事件时间是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感
器或生产服务附加。Flink通过时间戳分配器访问事件时间戳。
摄取时间是事件在源算子处输入Flink数据流的时间。
处理时间是执行基于时间的算子操作的每个算子的本地时间。
有关如何处理时间的更多详细信息,请参阅事件时间文档。
有状态的算子操作
数据流编程模型
12虽然数据流中的许多算子操作只是一次查看一个单独的事件(例如事件解析器),但某些算子操作会记住多个事件(例如窗口算子)的信息。这些算子操作称为有
状态。
状态算子操作的状态保持在可以被认为是嵌入式键值存储的状态中。状态被分区
并严格地与有状态算子读取的流一起分发。因此,只有在keyBy函数之后才能
在被Key化的数据流上访问键值状态,并且限制为与当前事件的键相关联的值。对
齐流和状态的Keys可确保所有状态更新都是本地算子操作,从而保证一致性而无
需事务开销。此对齐还允许Flink重新分配状态并透明地调整流分区。
有关更多信息,请参阅有关状态的文档。
容错检查点
Flink使用流重放和检查点的组合实现容错。检查点与每个输入流中的特定点以及每
个算子的对应状态相关。通过恢复算子的状态并从检查点重放事件,可以从检查点
恢复流数据流,同时保持一致性(恰好一次处理语义)。
检查点间隔是在执行期间用恢复时间(需要重放的事件的数量)来折衷容错开销的
手段。
容错内部的描述提供了有关Flink如何管理检查点和相关主题的更多信息。有关启用
和配置检查点的详细信息,请参阅检查点API文档。
流处理批处理
Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限数量的数据
元)。一个数据集在内部视为数据流。因此,上述概念以相同的方式应用于批处理
程序,并且它们适用于流程序,除了少数例外:
数据流编程模型
13批处理程序的容错不使用检查点。通过完全重放流来恢复。这是可能的,因为
输入有限。这会使成本更多地用于恢复,但使常规处理更便宜,因为它避免了
检查点。
DataSetAPI中的有状态算子操作使用简化的内存核外数据结构,而不是键值
索引。
DataSetAPI引入了特殊的同步(超级步骤)迭代,这些迭代只能在有界流上
进行。有关详细信息,请查看迭代文档。
下一步
继续使用Flink的DistributedRuntime中的基本概念。
数据流编程模型
14分布式运行时环境
译者:flink.sojb.cn
任务和算子链
对于分布式执行,Flink链算子子任务一起放入任务。每个任务由一个线程执行。将
算子链接到任务中是一项有用的优化:它可以Reduce线程到线程切换和缓冲的开
销,并在降低延迟的同时提高整体吞吐量。可以配置链接行为;有关详细信息,请
参阅链接文档。
下图中的示例数据流由五个子任务执行,因此具有五个并行线程。
TaskManager,JobManager,客户端
Flink运行时包含两种类型的进程:
该JobManagers(也称为Masters)协调分布式执行。他们安排任务,协调检
查点,协调故障恢复等。
总是至少有一个JobManager。高可用性设置将具有多个JobManagers,其中
一个始终是Leader,其他人处于待机状态。
该TaskManagers(也叫工人)执行任务(或者更具体地说,子任务)的数据
流,以及缓冲器和交换数据流。
分布式运行时环境
15必须始终至少有一个TaskManager。
JobManagers和TaskManagers可以通过多种方式启动:作为独立集群直接在计算
机上,在容器中,或由YARN或Mesos等资源框架管理。TaskManagers连接到
JobManagers,宣布自己可用,并被分配工作。
该客户端是不运行时和程序执行的一部分,而是被用来准备和发送的数据流的
JobManager。之后,客户端可以断开连接或保持连接以接收进度报告。客户端既
可以作为触发执行的JavaScala程序的一部分运行,也可以在命令行进程中运
行 .binflinkrun...。
任务槽和资源
每个worker(TaskManager)都是一个JVM进程,可以在不同的线程中执行一个或
多个子任务。为了控制工人接受的任务数量,工人有所谓的任务槽(至少一个)。
每个任务槽代表TaskManager的固定资源子集。例如,具有三个插槽的
TaskManager将其13的托管内存专用于每个插槽。切换资源意味着子任务不会与来
自其他作业的子任务竞争托管内存,而是具有一定数量的保存托管内存。请注意,此处不会发生CPU隔离;当前插槽只分离任务的托管内存。
通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager有
一个插槽意味着每个任务组在一个单独的JVM中运行(例如,可以在一个单独的容
器中启动)。拥有多个插槽意味着更多子任务共享同一个JVM。同一JVM中的任务
分布式运行时环境
16共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而Reduce每任务开销。
默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们
来自同一个作业。结果是一个槽可以保存作业的整个管道。允许此插槽共享有两个
主要好处:
Flink集群需要与作业中使用的最高并行度一样多的任务槽。无需计算程序总共
包含多少任务(具有不同的并行性)。
更容易获得更好的资源利用率。如果没有插槽共享,非密集源map子任
务将阻止与资源密集型窗口子任务一样多的资源。通过插槽共享,将示例中的
基本并行性从2增加到6可以充分利用时隙资源,同时确保繁重的子任务在
TaskManagers之间公平分配。
API还包括可用于防止不期望的时隙共享的资源组机制。
根据经验,一个很好的默认任务槽数就是CPU核心数。使用超线程,每个插槽然后
需要2个或更多硬件线程上下文。
状态后台
分布式运行时环境
17存储键值索引的确切数据结构取决于所选的状态后台。一个状态后台将数据存储在
内存中的哈希映射中,另一个状态后台使用RocksDB作为键值存储。除了定义保
存状态的数据结构之外,状态后台还实现逻辑以获取键值状态的时间点
SNAPSHOT,并将该SNAPSHOT存储为检查点的一部分。
保存点
用DataStreamAPI编写的程序可以从保存点恢复执行。保存点允许更新程序和
Flink群集,而不会丢失任何状态。
保存点是手动触发的检查点,它会获取程序的SNAPSHOT并将其写入状态后台。
他们依靠常规的检查点机制。在执行期间,程序会定期在工作节点上创建
SNAPSHOT并生成检查点。对于恢复,仅需要最后完成的检查点,并且一旦新的
检查点完成,就可以安全地丢弃旧的检查点。
保存点与这些定期检查点类似,不同之处在于它们由用户触发,并且在较新的检查
点完成时不会自动过期。可以从命令行或通过RESTAPI取消作业时创建保存点。
分布式运行时环境
18教程
教程
19API教程
API教程
20DataStreamAPI教程
译者:flink.sojb.cn
在本指南中,我们将从头开始,从设置Flink项目到在Flink集群上运行流分析程序。
Wikipedia提供了一个IRC频道,其中记录了对Wiki的所有编辑。我们将在Flink中读
取此通道,并计算每个用户在给定时间窗口内编辑的字节数。这很容易使用Flink在
几分钟内实现,但它将为您提供一个良好的基础,从而开始自己构建更复杂的分析
程序。
设置Maven项目
我们将使用FlinkMavenArchetype来创建我们的项目结构。有关此内容的更多详细
信息,请参阅JavaAPI快速入门。出于我们的目的,运行命令是这样的:
mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeCatalog=https:repository.apache.orgcontentrep
ositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT -DgroupId=wiki-edits -DartifactId=wiki-edits -Dversion=0.1 -Dpackage=wikiedits -DinteractiveMode=false
注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-
DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加
存储库条目。有关此更改的详细信息,请参阅Maven官方文档
您可以编辑 groupId, artifactId而 package如果你喜欢。使用上面的参
数,Maven将创建一个如下所示的项目结构:
DataStreamAPI教程
21treewiki-edits
wiki-edits
├──pom.xml
└──src
└──main
├──java
│└──wikiedits
│├──BatchJob.java
│├──SocketTextStreamWordCount.java
│├──StreamingJob.java
│└──WordCount.java
└──resources
└──log4j.properties
我们的 pom.xml文件已经在根目录中添加了Flink依赖项,并且有几个示例Flink程
序 srcmainjava。我们可以删除示例程序,因为我们将从头开始:
rmwiki-editssrcmainjavawikiedits.java
作为最后一步,我们需要将FlinkWikipedia连接器添加为依赖关系,以便我们可以
在我们的程序中使用它。编辑它的 dependencies部分 pom.xml,使它看起来像
这样:
DataStreamAPI教程
22注意 flink-connector-wikiedits_2.11添加的依赖项。(此示例和Wikipedia连
接器的灵感来自ApacheSamza的HelloSamza示例。)
编写Flink程序
这是编码时间。启动您喜欢的IDE并导入Maven项目或打开文本编辑器并创建文
件 srcmainjavawikieditsWikipediaAnalysis.java:
packagewikiedits;
publicclassWikipediaAnalysis{
publicstaticvoidmain(String[]args)throwsException{
}
}
该计划现在非常基础,但我们会尽力填写。请注意,我不会在此处提供import语
句,因为IDE可以自动添加它们。在本节结束时,如果您只想跳过并在编辑器中输
入,我将使用import语句显示完整的代码。
Flink程序的第一步是创建一个 StreamExecutionEnvironment(或
者 ExecutionEnvironment如果您正在编写批处理作业)。这可用于设置执行参
数并创建从外部系统读取的源。所以让我们继续把它添加到main方法:
StreamExecutionEnvironmentsee=StreamExecutionEnvironment.getE
xecutionEnvironment;
接下来,我们将创建一个从WikipediaIRC日志中读取的源:
DataStream
iaEditsSource);
这创建了一个我们可以进一步处理 DataStream的 WikipediaEditEvent数据
元。出于本示例的目的,我们感兴趣的是确定每个用户在特定时间窗口中添加或删
除的字节数,比如说五秒。为此,我们首先必须指定我们要在用户名上键入流,也
就是说此流上的算子操作应考虑用户名。在我们的例子中,窗口中编辑的字节的总
和应该是每个唯一的用户。对于键入流,我们必须提供一个 KeySelector,如下
所示:
DataStreamAPI教程
23KeyedStream
.keyBy(newKeySelector
@Override
publicStringgetKey(WikipediaEditEventevent){
returnevent.getUser;
}
});
这为我们提供了一个 WikipediaEditEvent具有 StringKeys的用户名。我们现
在可以指定我们希望在此流上加上窗口,并根据这些窗口中的数据元计算结果。窗
口指定要在其上执行计算的Stream片。在无限的数据元流上计算聚合时需要
Windows。在我们的例子中,我们将说我们想要每五秒聚合一次编辑的字节总和:
DataStream
.timeWindow(Time.seconds(5))
.fold(newTuple2<>(,0L),newFoldFunction
ent,Tuple2
@Override
publicTuple2
c,WikipediaEditEventevent){
acc.f0=event.getUser;
acc.f1+=event.getByteDiff;
returnacc;
}
});
第一个调用, .timeWindow指定我们想要有五秒钟的翻滚(非重叠)窗口。第
二个调用为每个唯一键指定每个窗口切片的折叠变换。在我们的例子中,我们从一
个初始值开始, (,0L)并在其中为用户添加该时间窗口中每个编辑的字节差
异。生成的Stream现在包含 Tuple2
用户。
剩下要做的就是将流打印到控制台并开始执行:
result.print;
see.execute;
最后一次调用是启动实际Flink作业所必需的。所有算子操作(例如创建源,转换和
接收器)仅构建内部算子操作的图形。只有在 execute被调用时才会在集群
上抛出或在本地计算机上执行此算子操作图。
到目前为止完整的代码是这样的:
packagewikiedits;
DataStreamAPI教程
24importorg.apache.flink.api.common.functions.FoldFunction;
importorg.apache.flink.api.java.functions.KeySelector;
importorg.apache.flink.api.java.tuple.Tuple2;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.datastream.KeyedStream;
importorg.apache.flink.streaming.api.environment.StreamExecutio
nEnvironment;
importorg.apache.flink.streaming.api.windowing.time.Time;
importorg.apache.flink.streaming.connectors.wikiedits.Wikipedia
EditEvent;
importorg.apache.flink.streaming.connectors.wikiedits.Wikipedia
EditsSource;
publicclassWikipediaAnalysis{
publicstaticvoidmain(String[]args)throwsException{
StreamExecutionEnvironmentsee=StreamExecutionEnvironment.
getExecutionEnvironment;
DataStream
ipediaEditsSource);
KeyedStream
.keyBy(newKeySelector
@Override
publicStringgetKey(WikipediaEditEventevent){
returnevent.getUser;
}
});
DataStream
.timeWindow(Time.seconds(5))
.fold(newTuple2<>(,0L),newFoldFunction
Event,Tuple2
@Override
publicTuple2
c,WikipediaEditEventevent){
acc.f0=event.getUser;
acc.f1+=event.getByteDiff;
returnacc;
}
});
result.print;
see.execute;
}
}
您可以使用Maven在IDE或命令行上运行此示例:
DataStreamAPI教程
25mvncleanpackage
mvnexec:java-Dexec.mainClass=wikiedits.WikipediaAnalysis
第一个命令构建我们的项目,第二个命令执行我们的主类。输出应该类似于:
1>(Fenixdown,114)
6>(AnomieBOT,155)
8>(BD2412bot,-3690)
7>(IgnorantArmies,49)
3>(Ckh3111,69)
5>(Slade360,0)
7>(Narutolovehinata5,2195)
6>(Vuyisa2001,79)
4>(MsSarahWelch,269)
4>(KasparBot,-245)
每行前面的数字告诉您输出生成的打印接收器的哪个并行实例。
这应该让您开始编写自己的Flink程序。要了解更多信息,您可以查看我们的基本概
念指南和DataStreamAPI。如果您想了解如何在自己的机器上设置Flink群集并将
结果写入Kafka,请坚持参加奖励练习。
奖金练习:在群集上运行并写入Kafka
请按照我们的本地安装教程在您的机器上设置Flink分发,并在继续算子操作之前
参考Kafka快速入门以设置Kafka安装。
作为第一步,我们必须添加FlinkKafka连接器作为依赖关系,以便我们可以使用
Kafka接收器。将其添加到 pom.xml依赖项部分中的文件:
接下来,我们需要修改我们的程序。我们将移除 print水槽,而是使用Kafka水
槽。新代码如下所示:
DataStreamAPI教程
26result
.map(newMapFunction
@Override
publicStringmap(Tuple2
returntuple.toString;
}
})
.addSink(newFlinkKafkaProducer08<>(localhost:9092,wiki-
result,newSimpleStringSchema));
还需要导入相关的类:
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaPro
ducer08;
importorg.apache.flink.api.common.serialization.SimpleStringSch
ema;
importorg.apache.flink.api.common.functions.MapFunction;
注意我们是如何第一个转换的流 Tuple2
用MapFunction。我们这样做是因为将简单字符串写入Kafka更容易。然后,我们创
建了一个Kafka水槽。您可能必须使主机名和端口适应您的设
置。 wiki-result是运行我们的程序之前我们将要创建的Kafka流的名称。使
用Maven构建项目因为我们需要jar文件在集群上运行:
mvncleanpackage
生成的jar文件将位于 target子文件夹中: targetwiki-edits-0.1.jar。我
们稍后会用到它。
现在我们准备启动Flink集群并运行写入Kafka的程序。转到安装Flink的位置并启动
本地群集:
cdmyflinkdirectory
binstart-cluster.sh
我们还必须创建Kafka主题,以便我们的程序可以写入它:
cdmykafkadirectory
binkafka-topics.sh--create--zookeeperlocalhost:2181--topi
cwiki-results
现在我们准备在本地Flink集群上运行我们的jar文件:
DataStreamAPI教程
27cdmyflinkdirectory
binflinkrun-cwikiedits.WikipediaAnalysispathtowikiedits
-0.1.jar
如果一切按计划进行,那么该命令的输出应该与此类似:
0308201615:09:27JobexecutionswitchedtostatusRUNNING.
0308201615:09:27Source:CustomSource(11)switchedtoSCHED
ULED
0308201615:09:27Source:CustomSource(11)switchedtoDEPLO
YING
0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(
5000),FoldingStateDescriptor{name=window-contents,defaultValue
=(,0),serializer=null},ProcessingTimeTrigger,WindowedStream
.fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi
tchedtoSCHEDULED
0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(
5000),FoldingStateDescriptor{name=window-contents,defaultValue
=(,0),serializer=null},ProcessingTimeTrigger,WindowedStream
.fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi
tchedtoDEPLOYING
0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(
5000),FoldingStateDescriptor{name=window-contents,defaultValue
=(,0),serializer=null},ProcessingTimeTrigger,WindowedStream
.fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi
tchedtoRUNNING
0308201615:09:27Source:CustomSource(11)switchedtoRUNNI
NG
您可以看到各个算子如何开始运行。只有两个,因为出于性能原因,窗口之后的算
子操作被折叠成一个算子操作。在Flink,我们称之为链接。
您可以通过使用Kafka控制台使用者检查Kafka主题来观察程序的输出:
binkafka-console-consumer.sh--zookeeperlocalhost:2181--topi
cwiki-result
您还可以查看应在http:localhost:8081上运行的Flink仪表板。您将获得群集资
源和正在运行的作业的概述:
如果单击正在运行的作业,您将获得一个视图,您可以在其中检查各个算子操作,例如,查看已处理数据元的数量:
DataStreamAPI教程
28这就结束了我们对Flink的小游览。如果您有任何疑问,请随时询问我们的邮件列
表。
DataStreamAPI教程
29Setup教程
Setup教程
30本地安装教程
译者:flink.sojb.cn
只需几个简单的步骤即可启动并运行Flink示例程序。
设置:下载并启动Flink
Flink可在Linux,MacOSX和Windows上运行。为了能够运行Flink,唯一的要求
是安装一个有效的Java8.x.Windows用户,请查看Windows上的Flink指南,该指
南介绍了如何在Windows上运行Flink以进行本地设置。
您可以通过发出以下命令来检查Java正确安装:
java-version
如果你有Java8,输出将如下所示:
javaversion1.8.0_111
Java(TM)SERuntimeEnvironment(build1.8.0_111-b14)
JavaHotSpot(TM)64-BitServerVM(build25.111-b14,mixedmode)
下载并编译
从我们的某个存储库克隆源代码,例如:
gitclonehttps:github.comapacheflink.git
cdflink
mvncleanpackage-DskipTeststhiswilltakeupto10minute
s
cdbuild-targetthisiswhereFlinkisinstall
edto
启动本地Flink群集
.binstart-cluster.shStartFlink
检查分派器的web前端在HTTP:localhost:8081,并确保一切都正常运行。Web
前端应报告单个可用的TaskManager实例。
本地安装教程
31您还可以通过检查 logs目录中的日志文件来验证系统是否正在运行:
taillogflink--standalonesession-.log
INFO...-Restendpointlisteningatlocalhost:8081
INFO...-http:localhost:8081wasgrantedleadership...
INFO...-Webfrontendlisteningathttp:localhost:8081.
INFO...-StartingRPCendpointforStandaloneResourceManagera
takka:flinkuserresourcemanager.
INFO...-StartingRPCendpointforStandaloneDispatcheratakk
a:flinkuserdispatcher.
INFO...-ResourceManagerakka.tcp:[[emailprotected]](cdn-c
gilemail-protection):6123userresourcemanagerwasgrantedlea
dership...
INFO...-StartingtheSlotManager.
INFO...-Dispatcherakka.tcp:[[emailprotected]](cdn-cgil
email-protection):6123userdispatcherwasgrantedleadership..
.
INFO...-Recoveringallpersistedjobs.
INFO...-RegisteringTaskManager...under...attheSlotMana
ger.
本地安装教程
32阅读代码
您可以在Scala和Java上的GitHub上找到此SocketWindowWordCount示例的完整源
代码。
Scala
Java
objectSocketWindowWordCount{
defmain(args:Array[String]):Unit={
theporttoconnectto
valport:Int=try{
ParameterTool.fromArgs(args).getInt(port)
}catch{
casee:Exception=>{
System.err.println(Noportspecified.Pleaseru
n'SocketWindowWordCount--port
return
}
}
gettheexecutionenvironment
valenv:StreamExecutionEnvironment=StreamExecutionEnv
ironment.getExecutionEnvironment
getinputdatabyconnectingtothesocket
valtext=env.socketTextStream(localhost,port,'\n')
parsethedata,groupit,windowit,andaggregateth
ecounts
valwindowCounts=text
.flatMap{w=>w.split(\\s)}
.map{w=>WordWithCount(w,1)}
.keyBy(word)
.timeWindow(Time.seconds(5),Time.seconds(1))
.sum(count)
printtheresultswithasinglethread,ratherthani
nparallel
windowCounts.print.setParallelism(1)
env.execute(SocketWindowWordCount)
}
Datatypeforwordswithcount
caseclassWordWithCount(word:String,count:Long)
}
本地安装教程
33publicclassSocketWindowWordCount{
publicstaticvoidmain(String[]args)throwsException{
theporttoconnectto
finalintport;
try{
finalParameterToolparams=ParameterTool.fromArgs(
args);
port=params.getInt(port);
}catch(Exceptione){
System.err.println(Noportspecified.Pleaserun'S
ocketWindowWordCount--port
return;
}
gettheexecutionenvironment
finalStreamExecutionEnvironmentenv=StreamExecutionEn
vironment.getExecutionEnvironment;
getinputdatabyconnectingtothesocket
DataStream
t,port,\n);
parsethedata,groupit,windowit,andaggregateth
ecounts
DataStream
.flatMap(newFlatMapFunction
@Override
publicvoidflatMap(Stringvalue,Collector
WithCount>out){
for(Stringword:value.split(\\s)){
out.collect(newWordWithCount(word,1L));
}
}
})
.keyBy(word)
.timeWindow(Time.seconds(5),Time.seconds(1))
.reduce(newReduceFunction
@Override
publicWordWithCountreduce(WordWithCounta,Wor
dWithCountb){
returnnewWordWithCount(a.word,a.count+b
.count);
}
});
printtheresultswithasinglethread,ratherthani
nparallel
windowCounts.print.setParallelism(1);
本地安装教程
34env.execute(SocketWindowWordCount);
}
Datatypeforwordswithcount
publicstaticclassWordWithCount{
publicStringword;
publiclongcount;
publicWordWithCount{}
publicWordWithCount(Stringword,longcount){
this.word=word;
this.count=count;
}
@Override
publicStringtoString{
returnword+:+count;
}
}
}
运行示例
现在,我们将运行此Flink应用程序。它将从套接字读取文本,并且每5秒打印一次
前5秒内每个不同单词的出现次数,即处理时间的翻滚窗口,只要文字漂浮在其
中。
首先,我们使用netcat来启动本地服务器
nc-l9000
提交Flink计划:
.binflinkrunexamplesstreamingSocketWindowWordCount.jar-
-port9000
Startingexecutionofprogram
程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:
本地安装教程
35本地安装教程
36单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到 stdout。监
视TaskManager的输出文件并写入一些文本 nc(输入在点击后逐行发送到
Flink
nc-l9000
loremipsum
ipsumipsumipsum
bye
该 .out文件将在每个时间窗口结束时,只要打印算作字浮在,例如:
tail-flogflink--taskexecutor-.out
lorem:1
bye:1
ipsum:4
要停止Flink当你做类型:
.binstop-cluster.sh
下一步
查看更多示例以更好地了解Flink的编程API。完成后,请继续阅读流处理指南。
本地安装教程
37在Windows上运行Flink
译者:flink.sojb.cn
如果要在Windows计算机上本地运行Flink,则需要下载并解压缩二进制Flink分发。
之后,您可以使用Windows批处理文件( .bat),或使用Cygwin运行Flink
JobManager。
从Windows批处理文件开始
要从Windows命令行启动Flink,请打开命令窗口,导航到 binFlink目录并运
行 start-cluster.bat。
注意: binJavaRuntimeEnvironment的文件夹必须包含在Window
的 %PATH%变量中。按照本指南将Java添加到 %PATH%变量中。
cdflink
cdbin
start-cluster.bat
StartingalocalclusterwithoneJobManagerprocessandoneTas
kManagerprocess.
YoucanterminatetheprocessesviaCTRL-Cinthespawnedshell
windows.
Webinterfacebydefaultonhttp:localhost:8081.
之后,您需要打开第二个终端来运行作业 flink.bat。
从Cygwin和UnixScripts开始
使用Cygwin,您需要启动Cygwin终端,导航到您的Flink目录并运
行 start-cluster.sh脚本:
cdflink
binstart-cluster.sh
Startingcluster.
从Git安装Flink
如果您正在从git存储库安装Flink并且您正在使用Windowsgitshell,则Cygwin可能
会产生类似于以下的故障:
在Windows上运行Flink
38c:flinkbinstart-cluster.sh:line30:figure>\r':commandnot
found
发生此错误是因为在Windows中运行时,git会自动将UNIX行结尾转换为Windows
样式行结尾。问题是Cygwin只能处理UNIX样式的行结尾。解决方案是通过以下三
个步骤调整Cygwin设置以处理正确的行结尾:
1. 启动一个Cygwinshell。
2. 输入确定您的主目录
cd;pwd
ThiswillreturnapathundertheCygwinrootpath.
1. 使用NotePad,写字板或其他文本编辑器打开 .bash_profile主目录中的文
件并附加以下内容:(如果文件不存在,则必须创建它)
exportSHELLOPTS
set-oigncr
保存文件并打开一个新的bashshell。
在Windows上运行Flink
39例子
译者:flink.sojb.cn
捆绑的例子
Flink的来源包括Flink不同API的许多示例:
DataStream应用程序(JavaScala)
DataSet应用程序(JavaScala)
TableAPISQL查询(JavaScala)
这些说明解释了如何运行示例。
Web上的示例
还有一些在线发布的博客文章讨论了示例应用程序:
如何使用ApacheFlink构建有状态流应用程序提供了一个使用DataStreamAPI
实现的事件驱动应用程序和两个用于流分析的SQL查询。
使用ApacheFlink,Elasticsearch和Kibana构建实时仪表板应用程序
是elastic.co的博客文章,展示了如何使用ApacheFlink,Elasticsearch和
Kibana构建用于流数据分析的实时仪表板解决方案。
来自DataArtisans的Flink培训网站有很多例子。查看动手部分和练习。
例子
40批处理示例
译者:flink.sojb.cn
以下示例程序展示了Flink的不同应用程序,从简单的字数统计到图形算法。代码示
例说明了Flink的DataSetAPI的使用。
可以在Flink源存储库的flink-examples-batch模块中找到以下和更多示例的完整源代
码。
运行一个例子
为了运行Flink示例,我们假设您有一个正在运行的Flink实例。导航中的“快速入
门”和“设置”选项卡描述了启动Flink的各种方法。
最简单的方法是运行 .binstart-cluster.sh,默认情况下启动一个带有一个
JobManager和一个TaskManager的本地集群。
Flink的每个二进制版本都包含一个 examples目录,其中包含此页面上每个示例
的jar文件。
要运行WordCount示例,请发出以下命令:
.binflinkrun.examplesbatchWordCount.jar
其他示例可以以类似的方式启动。
请注意,通过使用内置数据,许多示例在不传递任何参数的情况下运行。要使用实
际数据运行WordCount,您必须将路径传递给数据:
.binflinkrun.examplesbatchWordCount.jar--inputpathto
sometextdata--outputpathtoresult
请注意,非本地文件系统需要模式前缀,例如 hdfs:。
字数
WordCount是大数据处理系统的“HelloWorld”。它计算文本集合中单词的频率。该
算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计
数。
Java
Scala
批处理示例
41ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvi
ronment;
DataSet
DataSet
splitupthelinesinpairs(2-tuples)containing:(w
ord,1)
text.flatMap(newTokenizer)
groupbythetuplefield0andsumuptuplefield
1
.groupBy(0)
.sum(1);
counts.writeAsCsv(outputPath,\n,);
User-definedfunctions
publicstaticclassTokenizerimplementsFlatMapFunction
@Override
publicvoidflatMap(Stringvalue,Collector
nteger>>out){
normalizeandsplittheline
String[]tokens=value.toLowerCase.split(\\W+);
emitthepairs
for(Stringtoken:tokens){
if(token.length>0){
out.collect(newTuple2
}
}
}
}
该字计数示例实现上述算法的输入参
数: --input
文本文件都可以。
批处理示例
42valenv=ExecutionEnvironment.getExecutionEnvironment
getinputdatavaltext=env.readTextFile(pathtofile)
valcounts=text.flatMap{_.toLowerCase.split(\\W+)filter{
_.nonEmpty}}
.map{(_,1)}
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath,\n,)
该字计数示例实现上述算法的输入参
数: --input
文本文件都可以。
网页排名
PageRank算法计算链接定义的图形中页面的“重要性”,链接指向一个页面到另一个
页面。它是一种迭代图算法,这意味着它重复应用相同的计算。在每次迭代中,每
个页面在其所有邻居上分配其当前等级,并将其新等级计算为从其邻居接收的等级
的纳税总和。PageRank算法由Google搜索引擎推广,该搜索引擎利用网页的重要
性对搜索查询的结果进行排名。
在这个简单的例子中,PageRank通过批量迭代和固定数量的迭代来实现。
Java
Scala
ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvi
ronment;
readthepagesandinitialranksbyparsingaCSVfile
DataSet
agesInputPath)
.types(Long.class,Double.class)
thelinksareencodedasanadjacencylist:(page-id,Array(n
eighbor-ids))
DataSet
v);
setiterativedataset
IterativeDataSet
s.iterate(maxIterations);
DataSet
joinpageswithoutgoingedgesanddistributerank
批处理示例
43.join(pageLinkLists).where(0).equalTo(0).flatMap(newJoi
nVertexWithEdgesMatch)
collectandsumranks
.groupBy(0).sum(1)
applydampeningfactor
.map(newDampener(DAMPENING_FACTOR,numPages));
DataSet
th(
newRanks,newRanks.join(iteration).where(0).equalTo(0)
terminationcondition
.filter(newEpsilonFilter));
finalPageRanks.writeAsCsv(outputPath,\n,);
User-definedfunctions
publicstaticfinalclassJoinVertexWithEdgesMatch
implementsFlatJoinFunction
ble>,Tuple2
>{
@Override
publicvoidjoin(
ng[]>adj,Collector
Long[]neighbors=adj.f1;
doublerank=page.f1;
doublerankToDistribute=rank((double)neigbors.leng
th);
for(inti=0;i
out.collect(newTuple2
ankToDistribute));
}
}
}
publicstaticfinalclassDampenerimplementsMapFunction
privatefinaldoubledampening,randomJump;
publicDampener(doubledampening,doublenumVertices){
this.dampening=dampening;
this.randomJump=(1-dampening)numVertices;
}
@Override
publicTuple2
{
value.f1=(value.f1dampening)+randomJump;
批处理示例
44returnvalue;
}
}
publicstaticfinalclassEpsilonFilter
implementsFilterFunction
uble>,Tuple2
@Override
publicbooleanfilter(Tuple2
ng,Double>>value){
returnMath.abs(value.f0.f1-value.f1.f1)>EPSILON;
}
}
所述的PageRank程序实现上述实施例。它需要运行以下参
数: --pages
User-definedtypescaseclassLink(sourceId:Long,targetId:
Long)
caseclassPage(pageId:Long,rank:Double)
caseclassAdjacencyList(sourceId:Long,targetIds:Array[Long])
setupexecutionenvironmentvalenv=ExecutionEnvironment.g
etExecutionEnvironment
readthepagesandinitialranksbyparsingaCSVfilevalpa
ges=env.readCsvFile[Page](pagesInputPath)
thelinksareencodedasanadjacencylist:(page-id,Array(n
eighbor-ids))vallinks=env.readCsvFile[Link](linksInputPath)
assigninitialrankstopagesvalpagesWithRanks=pages.map(
p=>Page(p,1.0numPages))
buildadjacencylistfromlinkinputvaladjacencyLists=lin
ks
initializelists
.map(e=>AdjacencyList(e.sourceId,Array(e.targetId)))
concatenatelists
.groupBy(sourceId).reduce{
(l1,l2)=>AdjacencyList(l1.sourceId,l1.targetIds++l2.targ
etIds)
}
startiterationvalfinalRanks=pagesWithRanks.iterateWithTe
rmination(maxIterations){
currentRanks=>
valnewRanks=currentRanks
distributerankstotargetpages
批处理示例
45.join(adjacencyLists).where(pageId).equalTo(sourceId)
{
(page,adjacent,out:Collector[Page])=>
for(targetId<-adjacent.targetIds){
out.collect(Page(targetId,page.rankadjacent.target
Ids.length))
}
}
collectranksandsumthemup
.groupBy(pageId).aggregate(SUM,rank)
applydampeningfactor
.map{p=>
Page(p.pageId,(p.rankDAMPENING_FACTOR)+((1-DAMPE
NING_FACTOR)numPages))
}
terminateifnorankupdatewassignificant
valtermination=currentRanks.join(newRanks).where(pageId).equalTo(pageId){
(current,next,out:Collector[Int])=>
checkforsignificantupdate
if(math.abs(current.rank-next.rank)>EPSILON)out.co
llect(1)
}
(newRanks,termination)
}
valresult=finalRanks
emitresultresult.writeAsCsv(outputPath,\n,)
hePageRankprogramimplementstheaboveexample.Itrequiresthefollowing
parameterstorun:--pages
.
输入文件是纯文本文件,必须格式如下:
页面表示为由新行字符分隔的(长)ID。
例如, 1\n2\n12\n42\n63\n给出五个页面ID为1,2,12,42和63的页
面。
链接表示为由空格字符分隔的页面ID对。链接由换行符分隔:
例如, 12\n212\n112\n4263\n给出四个(定向)链接(1)-
>(2),(2)->(12),(1)->(12)和(42)->(63)。
对于这个简单的实现,要求每个页面至少有一个传入链接和一个传出链接(页面可
以指向自身)。
连接组件
批处理示例
46连通分量算法通过为同一连接部分中的所有顶点分配相同的组件ID来识别较大图形
的部分。与PageRank类似,ConnectedComponents是一种迭代算法。在每个步
骤中,每个顶点将其当前组件ID传播到其所有邻居。如果顶点小于其自己的组件
ID,则顶点接受来自邻居的组件ID。
此实现使用增量迭代:未更改其组件ID的顶点不参与下一步。这会产生更好的性
能,因为后面的迭代通常只处理一些异常值顶点。
Java
Scala
readvertexandedgedata
DataSet
DataSet
newUndirectEdge);
assigntheinitialcomponentIDs(equaltothevertexID)
DataSet
(newDuplicateValue
openadeltaiteration
DeltaIteration
=
verticesWithInitialId.iterateDelta(verticesWithInitialId
,maxIterations,0);
applythesteplogic:
DataSet
joinwiththeedges
.join(edges).where(0).equalTo(0).with(newNeighborWithCo
mponentIDJoin)
selecttheminimumneighborcomponentID
.groupBy(0).aggregate(Aggregations.MIN,1)
updateifthecomponentIDofthecandidateissmalle
r
.join(iteration.getSolutionSet).where(0).equalTo(0)
.flatMap(newComponentIdFilter);
closethedeltaiteration(deltaandnewworksetareidentica
l)
DataSet
,changes);
emitresult
result.writeAsCsv(outputPath,\n,);
User-definedfunctions
publicstaticfinalclassDuplicateValue
on
@Override
批处理示例
47publicTuple2
returnnewTuple2
}
}
publicstaticfinalclassUndirectEdge
implementsFlatMapFunction
>,Tuple2
Tuple2
@Override
publicvoidflatMap(Tuple2
2
invertedEdge.f0=edge.f1;
invertedEdge.f1=edge.f0;
out.collect(edge);
out.collect(invertedEdge);
}
}
publicstaticfinalclassNeighborWithComponentIDJoin
implementsJoinFunction
e2
@Override
publicTuple2
Component,Tuple2
returnnewTuple2
nt.f1);
}
}
publicstaticfinalclassComponentIdFilter
implementsFlatMapFunction
g,Long>,Tuple2
{
@Override
publicvoidflatMap(Tuple2
if(value.f0.f1
out.collect(value.f0);
}
}
}
该ConnectedComponents程序实现上述实施例。它需要运行以下参
数: --vertices
批处理示例
48setupexecutionenvironmentvalenv=ExecutionEnvironment.g
etExecutionEnvironment
readvertexandedgedata
assigntheinitialcomponents(equaltothevertexid)valve
rtices=getVerticesDataSet(env).map{id=>(id,id)}
undirectededgesbyemittingforeachinputedgetheinputed
gesitselfandaninverted
versionvaledges=getEdgesDataSet(env).flatMap{edge=>Se
q(edge,(edge._2,edge._1))}
openadeltaiterationvalverticesWithComponents=vertices.
iterateDelta(vertices,maxIterations,Array(0)){
(s,ws)=>
applythesteplogic:joinwiththeedges
valallNeighbors=ws.join(edges).where(0).equalTo(0){(ver
tex,edge)=>
(edge._2,vertex._2)
}
selecttheminimumneighbor
valminNeighbors=allNeighbors.groupBy(0).min(1)
updateifthecomponentofthecandidateissmaller
valupdatedComponents=minNeighbors.join(s).where(0).equalT
o(0){
(newVertex,oldVertex,out:Collector[(Long,Long)])=>
if(newVertex._2
}
deltaandnewworksetareidentical
(updatedComponents,updatedComponents)
}
verticesWithComponents.writeAsCsv(outputPath,\n,)
这个PageRank程序实现了上面的例子。它需要运行以下参
数: --pages
输入文件是纯文本文件,必须格式如下:
顶点表示为ID并用换行符分隔。
例如, 1\n2\n12\n42\n63\n给出五个顶点(1),(2),(12),(42)和(63)。
边缘表示为由空格字符分隔的顶点ID的对。边线由换行符分隔:
例如, 12\n212\n112\n4263\n给出四个(无向)链路(1)-
(2),(2)-(12),(1)-(12)和(42)-(63)。
批处理示例
49批处理示例
50应用开发
应用开发
51项目构建设置
项目构建设置
52Java项目模板
译者:flink.sojb.cn
只需几个简单的步骤即可开始使用FlinkJava程序。
要求
唯一的要求是使用Maven3.0.4(或更高版本)和Java8.x安装。
创建项目
使用以下命令之一创建项目:
使用Maven原型
运行快速入门脚本
mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeCatalog=https:repository.apache.orgcontentr
epositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT
这允许您命名新创建的项目。它将以交互方式询问您groupId,artifactId和包名称。
curlhttps:flink.apache.orgqquickstart-SNAPSHOT.sh|bash
-s1.7-SNAPSHOT
注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-
DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加
存储库条目。有关此更改的详细信息,请参阅Maven官方文档
检查项目
您的工作目录中将有一个新目录。如果您使用了curl方法,则会调用该目
录 quickstart。否则,它的名称为 artifactId:
Java项目模板
53treequickstart
quickstart
├──pom.xml
└──src
└──main
├──java
│└──org
│└──myorg
│└──quickstart
│├──BatchJob.java
│└──StreamingJob.java
└──resources
└──log4j.properties
示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob
是DataStream和DataSet程序的基本框架程序。的主要方法是程序的入口点,无论
是对在-IDE测试执行并作适当的部署。
我们建议您将此项目导入IDE以进行开发和测试。IntelliJIDEA支持开箱即用的
Maven项目。如果您使用Eclipse,则m2e插件允许导入Maven项目。某些Eclipse
包默认包含该插件,其他包需要您手动安装它。
MacOSX用户注意事项:对于Flink,Java默认JVM堆可能太小。你必须手动增加
它。在Eclipse中,选择 RunConfigurations->Arguments并写
入 VMArguments框: -Xmx800m。在IntelliJIDEA中,推荐的方法是
从 Help|EditCustomVMOptions菜单中更改JVM选项。有关详细信息,请
参阅此文
构建项目
如果要构建打包项目,请转到项目目录并运行' mvncleanpackage'命令。您将
找到一个包含您的应用程序的JAR文件,以及您可能已作为依赖项添加到应用程序
的连接器和库: target
注意:如果您使用与StreamingJob不同的类作为应用程序的主类入口点,我们建
议您相应地更改文件中的 mainClass设置 pom.xml。这样,Flink可以从JAR文
件运行时间应用程序,而无需另外指定主类。
下一步
写你的申请!
如果您正在编写流处理应用程序并且正在寻找灵感来写什么,请查看流处理应用程
序教程。
如果您正在编写批处理应用程序,并且正在寻找要编写的内容,请查看批处理应用
程序示例。
Java项目模板
54有关API的完整概述,请查看DataStreamAPI和DataSetAPI部分。
在这里,您可以了解如何在本地群集上的IDE外部运行应用程序。
如果您有任何问题,请在我们的邮件列表中查询。我们很乐意提供帮助。
Java项目模板
55Scala的项目模板
译者:flink.sojb.cn
构建工具
Flink项目可以使用不同的构建工具构建。为了快速入门,Flink为以下构建工具提供
了项目模板:
SBT
Maven
这些模板可帮助您设置项目结构并创建初始构建文件。
SBT
创建项目
您可以通过以下两种方法之一构建新项目:
使用sbt模板
运行快速入门脚本
sbtnewtillrohrmannflink-project.g8
这将提示您输入几个参数(项目名称,flink版本...),然后从flink-project模板创建
一个Flink项目。您需要sbt>=0.13.13才能执行此命令。如有必要,您可以按照此
安装指南获取。
bash<(curlhttps:flink.apache.orgqsbt-quickstart.sh)
ThiswillcreateaFlinkprojectinthespecifiedprojectdirectory.
构建项目
要构建项目,您只需发出 sbtcleanassembly命令即可。这将在target
scala_your-major-scala-version目录中创建fat-jaryour-project-name-
assembly-0.1-SNAPSHOT.jar。
运行项目
Scala的项目模板
56要运行项目,您必须发出 sbtrun命令。
默认情况下,这将在运行的同一JVM中运行您的作业 sbt。要在不同的JVM中运
行您的作业,请添加以下行 build.sbt
forkinrun:=true
的IntelliJ
我们建议您使用IntelliJ进行Flink作业开发。要开始,您必须将新创建的项目导入
IntelliJ。您可以通
过 File->New->ProjectfromExistingSources...然后选择项目
目录来执行此算子操作。然后,IntelliJ将自动检测 build.sbt文件并设置所有内
容。
为了运行Flink作业,建议选择 mainRunner模块作为运行调试配置的类路径。这
将确保所有设置为提供的依赖项在执行时可用。您可以配置运行调试配置通
过 Run->EditConfigurations...,然后选择 mainRunner从模块的使
用类路径的Dropbox。
Eclipse
要将新创建的项目导入Eclipse,首先必须为其创建Eclipse项目文件。这些项目文
件可以通过sbteclipse插件创建。将以下行添加到您
的 PROJECT_DIRprojectplugins.sbt文件中:
addSbtPlugin(com.typesafe.sbteclipse%sbteclipse-plugin%
4.0.0)
在 sbt使用以下命令创建Eclipse项目文件
>eclipse
现在,您可以通过项目将项目导入Eclipse
File->Import...->ExistingProjectsintoWorkspace,然后
选择项目目录。
Maven
要求
唯一的要求是使用Maven3.0.4(或更高版本)和Java8.x安装。
Scala的项目模板
57创建项目
使用以下命令之一创建项目:
使用Maven原型
运行快速入门脚本
mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeCatalog=https:repository.apache.orgcontentr
epositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT
这允许您命名新创建的项目。它将以交互方式询问您groupId,artifactId和包名称。
curlhttps:flink.apache.orgqquickstart-scala-SNAPSHOT.sh
|bash-s1.7-SNAPSHOT
注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-
DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加
存储库条目。有关此更改的详细信息,请参阅Maven官方文档
检查项目
您的工作目录中将有一个新目录。如果您使用了curl方法,则会调用该目
录 quickstart。否则,它的名称为 artifactId:
treequickstart
quickstart
├──pom.xml
└──src
└──main
├──resources
│└──log4j.properties
└──scala
└──org
└──myorg
└──quickstart
├──BatchJob.scala
└──StreamingJob.scala
示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob
是DataStream和DataSet程序的基本框架程序。的主要方法是程序的入口点,无论
是对在-IDE测试执行并作适当的部署。
Scala的项目模板
58我们建议您将此项目导入IDE。
IntelliJIDEA支持Maven开箱即用,并为Scala开发提供插件。根据我们的经验,IntelliJ为开发Flink应用程序提供了最佳体验。
对于Eclipse,您需要以下插件,您可以从提供的EclipseUpdateSites安装这些插
件:
Eclipse4.x
ScalaIDE
m2eclipse的-Scala
构建HelperMaven插件
Eclipse3.8
用于Scala2.11的ScalaIDE或用于Scala2.10的ScalaIDE
m2eclipse的-Scala
构建HelperMaven插件
构建项目
如果要构建打包项目,请转到项目目录并运行' mvncleanpackage'命令。您将
找到一个包含您的应用程序的JAR文件,以及您可能已作为依赖项添加到应用程序
的连接器和库: target
注意:如果您使用与StreamingJob不同的类作为应用程序的主类入口点,我们建
议您相应地更改文件中的 mainClass设置 pom.xml。这样,Flink可以从JAR文
件运行时间应用程序,而无需另外指定主类。
下一步
写你的申请!
如果您正在编写流处理应用程序并且正在寻找灵感来写什么,请查看流处理应用程
序教程
如果您正在编写批处理应用程序,并且正在寻找要编写的内容,请查看批处理应用
程序示例
有关API的完整概述,请查看DataStreamAPI和DataSetAPI部分。
在这里,您可以了解如何在本地群集上的IDE外部运行应用程序。
如果您有任何问题,请在我们的邮件列表中查询。我们很乐意提供帮助。
Scala的项目模板
59配置依赖关系,连接器,库
译者:flink.sojb.cn
每个Flink应用程序都依赖于一组Flink库。至少,应用程序依赖于FlinkAPI。许多应
用程序还依赖于某些连接器库(如Kafka,Cassandra等)。运行Flink应用程序时
(在分布式部署中或在IDE中进行测试),Flink运行时库也必须可用。
Flink核心和应用程序依赖项
与大多数运行用户定义应用程序的系统一样,Flink中有两大类依赖项和库:
Flink核心依赖关系:Flink本身由运行系统所需的一组类和依赖项组成,例如
协调,网络,检查点,故障转移,API,算子操作(如窗口),资源管理等。
这些类和依赖项构成了Flink运行时的核心,在启动Flink应用程序时必须存在。
这些核心类和依赖项打包在 flink-distjar中。它们是Flink lib文件夹的
一部分,是Flink基本容器镜像的一部分。想象成类似于Java核心库(这些依赖
关系 rt.jar, charsets.jar等等),其中包含像类 String和 List。
Flink核心依赖项不包含任何连接器或库(CEP,SQL,ML等),以避免默认
情况下在类路径中具有过多的依赖项和类。实际上,我们尝试尽可能保持核心
依赖关系,以保持默认类路径较小并避免依赖性冲突。
所述用户应用程序的依赖关系是所有的连接器,格式或库,一个特定的用户应
用需求。
用户应用程序通常打包到应用程序jar中,该应用程序jar包含应用程序代码以及
所需的连接器和库依赖项。
用户应用程序依赖项显式不包括FlinkDataSetDataStreamAPI和运行时依赖
项,因为它们已经是Flink核心依赖项的一部分。
设置项目:基本依赖项
每个Flink应用程序都需要最低限度的API依赖关系来进行开发。对于Maven,您可
以使用Java项目模板或Scala项目模板来创建具有这些初始依赖项的程序框架。
手动设置项目时,需要为JavaScalaAPI添加以下依赖项(此处以Maven语法显
示,但相同的依赖项也适用于其他构建工具(Gradle,SBT等)。
Java
Scala
配置依赖关系,连接器,库
60
重要提示:请注意,所有这些依赖项都将其范围设置为提供。这意味着需要对它们
进行编译,但不应将它们打包到项目生成的应用程序jar文件中-这些依赖项是Flink
CoreDependencies,它们已在任何设置中提供。
强烈建议将依赖关系保持在提供的范围内。如果它们未设置为提供,则最好的情况
是生成的JAR变得过大,因为它还包含所有Flink核心依赖项。最糟糕的情况是添加
到应用程序的jar文件的Flink核心依赖项与您自己的一些依赖版本冲突(通常通过反
向类加载来避免)。
关于IntelliJ的注意事项:要使应用程序在IntelliJIDEA中运行,需要在范围编译中
声明Flink依赖项,而不是提供。否则,IntelliJ不会将它们添加到类路径中,并且in-
IDE执行将失败并带有 NoClassDefFountError。为了避免必须将依赖范围声明
为compile(不推荐使用,请参见上文),上面链接的Java和Scala项目模板使用了
一个技巧:它们添加了一个配置文件,该应用程序在IntelliJ中运行时有选择地激活
在不影响JAR文件打包的情况下,将依赖关系提升到范围编译。
添加连接器和库依赖项
大多数应用程序需要运行特定的连接器或库,例如连接到Kafka,Cassandra等的连
接器。这些连接器不是Flink的核心依赖项的一部分,因此必须作为依赖项添加到应
用程序中
配置依赖关系,连接器,库
61下面是将Kafka0.10的连接器添加为依赖项(Maven语法)的示例:
我们建议将应用程序代码及其所有必需的依赖项打包到一个jar-with-dependencies
中,我们将其称为应用程序jar。应用程序jar可以提交给已经运行的Flink集群,也可
以添加到Flink应用程序容器镜像中。
从Java项目模板或Scala项目模板创建的项目配置为在运行时自动将应用程序依赖
项包含到应用程序jar中 mvncleanpackage。对于未从这些模板设置的项目,我
们建议添加MavenShade插件(如下面的附录中所列)以构建具有所有必需依赖项
的应用程序jar。
重要:对于Maven(和其他构建工具)将依赖项正确打包到应用程序jar中,必须在
范围编译中指定这些应用程序依赖项(与核心依赖项不同,核心依赖项必须在提供
的作用域中指定)。
Scala版本
Scala版本(2.10,2.11,2.12等)彼此不是二进制兼容的。因此,FalaforScala2.11
不能与使用Scala2.12的应用程序一起使用。
例如,所有(传递上)依赖于Scala的Flink依赖项都以它们为其构建的Scala版本为
后缀 flink-streaming-scala_2.11。
只使用Java开发人员可以选择任何Scala版本,Scala开发人员需要选择与其应用程
序的Scala版本匹配的Scala版本。
有关如何为特定Scala版本构建Flink的详细信息,请参阅构建指南。
注意:由于Scala2.12中的重大更改,Flink1.5目前仅针对Scala2.11构建。我们的
目标是在下一版本中添加对Scala2.12的支持。
Hadoop依赖项
一般规则:永远不必将Hadoop依赖项直接添加到您的应用程序中。(唯一的例外
是当使用现有的Hadoop输入输出格式与Flink的Hadoop兼容打包器时)
如果要将Flink与Hadoop一起使用,则需要具有包含Hadoop依赖关系的Flink设置,而不是将Hadoop添加为应用程序依赖关系。有关详细信息,请参阅Hadoop设置指
南。
该设计有两个主要原因:
配置依赖关系,连接器,库
62一些Hadoop交互发生在Flink的核心,可能在用户应用程序启动之前,例如为
检查点设置HDFS,通过Hadoop的Kerberos令牌进行身份验证或在YARN上部
署。
Flink的反向类加载方法隐藏了核心依赖关系中的许多传递依赖关系。这不仅适
用于Flink自己的核心依赖项,也适用于Hadoop在设置中存在的依赖项。这
样,应用程序可以使用相同依赖项的不同版本,而不会遇到依赖冲突(并且相
信我们,这是一个大问题,因为Hadoops依赖树很大。)
如果在IDE内部的测试或开发过程中需要Hadoop依赖关系(例如,用于HDFS访
问),请配置这些依赖关系,类似于要测试或提供的依赖关系的范围。
附录:用于构建具有依赖关系的Jar的模板
要构建包含声明的连接器和库所需的所有依赖项的应用程序JAR,可以使用以下
shade插件定义:
配置依赖关系,连接器,库
63
s:jsr305
ude>
lude>
lude>
che.maven.plugins.shade.resource.ManifestResourceTransformer>
z
配置依赖关系,连接器,库
64配置依赖关系,连接器,库
65基本API概念
译者:flink.sojb.cn
Flink程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据
写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序可以在各种环境
中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多
计算机的集群上执行。
根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中
DataSetAPI用于批处理,DataStreamAPI用于流式处理。本指南将介绍两种API共
有的基本概念,但请参阅我们的流处理指南和批处理指南,了解有关使用每个API
编写程序的具体信息。
注:当显示的API时,如何使用,我们将用实际的例子
StreamingExecutionEnvironment和 DataStreamAPI。 DataSetAPI中的概
念完全相同,只需替换为 ExecutionEnvironment和 DataSet。
DataSet和DataStream
Flink具有特殊类 DataSet并 DataStream在程序中表示数据。您可以将它们视为
可以包含重复项的不可变数据集合。在 DataSet数据有限的情况下,对于一
个 DataStream数据元的数量可以是无界的。
这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着
一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。
集合最初通过在Flink程序添加源创建和新的集合从这些通过将它们使用API方法如
衍生 map, filter等等。
Flink计划的剖析
Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:
1. 获得一个 executionenvironment,2. 加载创建初始数据,3. 指定此数据的转换,4. 指定放置计算结果的位置,5. 触发程序执行
6. Java
7. Scala
基础API概念
66我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java
DataSetAPI的所有核心类都可以在org.apache.flink.api.java包中找到,而Java
DataStreamAPI的类可以在org.apache.flink.streaming.api中找到。
这 StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态
方法获取一个 StreamExecutionEnvironment:
getExecutionEnvironment
createLocalEnvironment
createRemoteEnvironment(Stringhost,intport,String...jarFile
s)
通常,您只需要使用 getExecutionEnvironment,因为这将根据上下文做正
确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环
境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通
过命令行调用它,则Flink集群管理器将执行您的main方法
并 getExecutionEnvironment返回一个运行环境,以便在集群上执行您的程
序。
对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐
行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列
行读取,您可以使用:
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironmen
t.getExecutionEnvironment;
DataStream
);
这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生
DataStream。
您可以通过使用转换函数调用DataStream上的方法来应用转换。例如,Map转换如
下所示:
DataStream
DataStream
nteger>{
@Override
publicIntegermap(Stringvalue){
returnInteger.parseInt(value);
}
});
基础API概念
67这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。
这些只是创建接收器的一些示例方法:
writeAsText(Stringpath)
我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala
DataSetAPI的所有核心类都可以在org.apache.flink.api.scala包中找到,而Scala
DataStreamAPI的类可以在org.apache.flink.streaming.api.scala中找到。
这 StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态
方法获取一个 StreamExecutionEnvironment:
getExecutionEnvironment
createLocalEnvironment
createRemoteEnvironment(host:String,port:Int,jarFiles:Strin
g)
通常,您只需要使用 getExecutionEnvironment,因为这将根据上下文做正
确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环
境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通
过命令行调用它,则Flink集群管理器将执行您的main方法
并 getExecutionEnvironment返回一个运行环境,以便在集群上执行您的程
序。
对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐
行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列
行读取,您可以使用:
valenv=StreamExecutionEnvironment.getExecutionEnvironment
valtext:DataStream[String]=env.readTextFile(file:pathto
file)
这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生
DataStream。
您可以通过使用转换函数调用DataSet上的方法来应用转换。例如,Map转换如下
所示:
基础API概念
68valinput:DataSet[String]=...
valmapped=input.map{x=>x.toInt}
这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。
这些只是创建接收器的一些示例方法:
writeAsText(path:String)
一旦您指定的完整程序,你需要触发执行程序调用
execute上 StreamExecutionEnvironment。根据执行的类
型, ExecutionEnvironment将在本地计算机上触发执行或提交程序以在群集上
执行。
该 execute方法返回一个 JobExecutionResult,包含执行时间和累加器结
果。
有关流数据源和接收器的信息,请参阅流指南,以及有关DataStream上支持的转换
的更深入信息。
有关批处理数据源和接收器的信息,请查看批处理指南,以及有关DataSet支持的
转换的更深入信息。
懒惰的评价
所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直
接发生。而是创建每个算子操作并将其添加到程序的计划中。当 execute运行
环境上的调用显式触发执行时,实际执行算子操作。程序是在本地执行还是在集群
上执行取决于运行环境的类型
懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。
指定Keys
某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他
转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在
Keys上分组。
DataSet被分组为
基础API概念
69DataSet<...>input=[...]
DataSet<...>reduced=input
.groupBy(definekeyhere)
.reduceGroup(dosomething);
虽然可以使用DataStream指定Keys
DataStream<...>input=[...]
DataStream<...>windowed=input
.keyBy(definekeyhere)
.window(windowspecification);
Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。
键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。
注意:在下面的讨论中,我们将使用 DataStreamAPI和 keyBy。对于DataSet
API,您只需要替换为 DataSet和 groupBy。
定义元组的键
最简单的情况是在元组的一个或多个字段上对元组进行分组:
Java
Scala
DataStream
KeyedStream
By(0)
valinput:DataStream[(Int,String,Long)]=[...]valkeyed
=input.keyBy(0)
元组在第一个字段(整数类型)上分组。
Java
Scala
DataStream
KeyedStream
By(0,1)
基础API概念
70valinput:DataSet[(Int,String,Long)]=[...]valgrouped=
input.groupBy(0,1)
在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。
关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:
DataStream
指定 keyBy(0)将导致系统使用full Tuple2作为键(以Integer和Float为键)。
如果要“导航”到嵌套中 Tuple2,则必须使用下面解释的字段表达式键。
使用FieldExpressions定义键
您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连
接或coGrouping的键。
字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类
型。
Java
Scala
在下面的示例中,我们有一个 WCPOJO,其中包含两个字段“word”和“count”。要
按字段分组 word,我们只需将其名称传递给 keyBy函数即可。
someordinaryPOJO(PlainoldJavaObject)
publicclassWC{
publicStringword;
publicintcount;
}
DataStream





