当前位置: 首页 > 新闻 > 信息荟萃
编号:5488
Flink基础教程豆瓣.pdf
http://www.100md.com 2020年11月12日
第1页
第9页
第17页
第23页
第47页
第295页

    参见附件(11499KB,1169页)。

     Flink基础教程,Flink是众多大数据处理框架中一颗冉冉升起的新星。它以同一种技术支持流处理和批处理,并能同时满足高吞吐、低延迟和容错的需求。本书由Flink项目核心成员执笔,系统阐释Flink的适用场景、设计理念、功能、用途和性能优势。

    编辑推荐

    作为新一代的开源流处理器,Flink是众多大数据处理框架中一颗冉冉升起的新星。它以同一种技术支持流处理和批处理,并能同时满足高吞吐、低延迟和容错的需求。本书由Flink项目核心成员执笔,系统阐释Flink的适用场景、设计理念、功能、用途和性能优势。

    - Flink的适用场景

    - 流处理架构相较于批处理架构的优势

    - Flink中的时间概念

    - Flink的检查点机制

    - Flink的性能优势

    内容简介

    近年来,流处理变得越来越流行。作为高度创新的开源流处理器,Flink拥有诸多优势,包括容错性、高吞吐、低延迟,以及同时支持流处理和批处理的能力。本书分为6章,侧重于介绍Flink的核心设计理念、功能和用途,内容涉及事件时间和处理时间、窗口和水印机制、检查点机制、性能测评,以及Flink如何实现批处理。

    本书面向有兴趣学习如何分析大规模流数据的读者。

    作者简介

    作者介绍

    埃伦·弗里德曼(Ellen Friedman)

    解决方案咨询师,知名大数据相关技术布道师,在流处理架构和大数据处理框架等方面有多部著作。

    科斯塔斯·宙马斯(Kostas Tzoumas)

    Flink项目核心成员,data Artisans公司联合创始人兼首席执行官,在流处理和数据科学领域经验丰富。

    译者介绍

    王绍翾

    阿里巴巴资 深技术专家,Apache Flink Committer,淘宝花名“大沙”。毕业于北京大学信息科学技术学院,后取得加州大学圣地亚哥分校计算机工程博士学位。目前就职于阿里巴巴计算平台事业部,负责Flink SQL引擎及机器学习的相关开发。加入阿里巴巴之前,在Facebook开发分布式图存储系统TAO。曾多次拜访由Flink创始团队创办的公司data Artisans,并与其首 席执行官科斯塔斯·宙马斯(本书作者之一)以及首 席技术官斯蒂芬·尤恩有着广泛的合作。

    目录

    前言 ix

    第 1章 为何选择Flink 1

    1.1 流处理欠佳的后果 2

    1.1.1 零售业和市场营销 2

    1.1.2 物联网 3

    1.1.3 电信业 5

    1.1.4 银行和金融业 5

    1.2 连续事件处理的目标 6

    1.3 流处理技术的演变 6

    1.4 初探Flink 9

    1.5 生产环境中的Flink 12

    1.5.1 布衣格电信 13

    1.5.2 其他案例 14

    1.6 Flink的适用场景 15

    第 2章 流处理架构 17

    2.1 传统架构与流处理架构 17

    2.2 消息传输层和流处理层 18

    2.3 消息传输层的理想功能 19

    2.3.1 兼具高性能和持久性 20

    2.3.2 将生产者和消费者解耦 20

    2.4 支持微服务架构的流数据 21

    2.4.1 数据流作为中心数据源 22

    2.4.2 欺诈检测:流处理架构用例 22

    2.4.3 给开发人员带来的灵活性 24

    2.5 不限于实时应用程序 24

    2.6 流的跨地域复制 26

    第3章 Flink 的用途 29

    3.1 不同类型的正确性 29

    3.1.1 符合产生数据的自然规律 29

    3.1.2 事件时间 31

    3.1.3 发生故障后仍保持准确 32

    3.1.4 及时给出所需结果 33

    3.1.5 使开发和运维更轻松 33

    3.2 分阶段采用Flink 34

    第4章 对时间的处理 35

    4.1 采用批处理架构和Lambda 架构计数 35

    4.2 采用流处理架构计数 38

    4.3 时间概念 40

    4.4 窗口 41

    4.4.1 时间窗口 41

    4.4.2 计数窗口 43

    4.4.3 会话窗口 43

    4.4.4 触发器 44

    4.4.5 窗口的实现 44

    4.5 时空穿梭 44

    4.6 水印 45

    4.7 真实案例:爱立信公司的Kappa 架构 47

    第5章 有状态的计算 49

    5.1 一致性 50

    5.2 检查点:保证exactly-once 51

    5.3 保存点:状态版本控制 59

    5.4 端到端的一致性和作为数据库的流处理器 62

    5.5 Flink 的性能 65

    5.5.1 Yahoo! Streaming Benchmark 65

    5.5.2 变化1:使用Flink 状态 66

    5.5.3 变化2:改进数据生成器并增加吞吐量 67

    5.5.4 变化3:消除网络瓶颈 68

    5.5.5 变化4:使用MapR Streams 69

    5.5.6 变化5:增加key 基数 69

    5.6 结论 71

    第6章 批处理:一种特殊的流处理 73

    6.1 批处理技术 75

    6.2 案例研究:Flink 作为批处理器 76

    附录 其他资源 79

    关于作者 84

    Flink基础教程豆瓣截图

    1.1

    1.2

    1.2.1

    1.2.2

    1.3

    1.3.1

    1.3.1.1

    1.3.2

    1.3.2.1

    1.3.2.2

    1.4

    1.4.1

    1.5

    1.5.1

    1.5.1.1

    1.5.1.2

    1.5.1.3

    1.5.2

    1.5.2.1

    1.5.2.2

    1.5.3

    1.5.3.1

    1.5.3.1.1

    1.5.3.1.2

    1.5.3.2

    1.5.3.2.1

    1.5.3.2.2

    1.5.3.2.3

    1.5.3.2.4

    1.5.3.2.5

    1.5.3.2.6

    1.5.3.3

    1.5.3.3.1

    1.5.3.3.2

    1.5.3.3.3

    目錄

    ApacheFlink文档

    概念

    数据流编程模型

    分布式运行时环境

    教程

    API教程

    DataStreamAPI教程

    Setup教程

    本地安装教程

    在Windows上运行Flink

    例子

    批处理示例

    应用开发

    项目构建设置

    Java项目模板

    Scala的项目模板

    配置依赖关系,连接器,库

    基础API概念

    ScalaAPI扩展

    JavaLambda表达式

    FlinkDataStreamAPI编程指南

    活动时间

    生成时间戳水印

    预定义的时间戳提取器水印发射器

    状态与容错

    状态运行

    广播状态模式

    检查点

    可查询状态Beta

    状态后台

    管理状态的自定义序列化

    算子

    视窗

    Join

    过程函数(低级算子操作)

    11.5.3.3.4

    1.5.3.4

    1.5.3.4.1

    1.5.3.4.2

    1.5.3.4.3

    1.5.3.4.4

    1.5.3.4.5

    1.5.3.4.6

    1.5.3.4.7

    1.5.3.4.8

    1.5.3.4.9

    1.5.3.4.10

    1.5.3.5

    1.5.3.6

    1.5.3.7

    1.5.3.8

    1.5.4

    1.5.4.1

    1.5.4.2

    1.5.4.3

    1.5.4.4

    1.5.4.5

    1.5.4.6

    1.5.4.7

    1.5.4.8

    1.5.4.9

    1.5.5

    1.5.5.1

    1.5.5.2

    1.5.5.3

    1.5.5.4

    1.5.5.5

    1.5.5.6

    1.5.5.7

    1.5.5.8

    1.5.5.9

    1.5.6

    1.5.6.1

    1.5.7

    外部数据访问的异步IO.

    流连接器

    数据源和接收器的容错保证

    ApacheKafka连接器

    ApacheCassandra连接器

    亚马逊AWSKinesisStreams连接器

    Elasticsearch连接器

    HDFS连接器

    流文件接收器

    RabbitMQ连接器

    ApacheNiFi连接器

    Twitter连接器

    旁路输出

    Python编程指南(流)Beta

    测试

    实验特点

    FlinkDataSetAPI编程指南

    数据集转换

    容错

    在数据集中压缩数据元

    迭代

    Python编程指南Beta

    连接器

    Hadoop兼容性测试版

    本地执行

    群集执行

    TableAPI和SQL

    概念和通用API

    流处理概念

    连接到外部系统

    TableAPI

    SQL

    内置函数

    用户定义的源和接收器

    用户定义的函数

    SQL客户端测试版

    数据类型和序列化

    为Flink程序注册自定义序列化程序

    管理执行

    21.5.7.1

    1.5.7.2

    1.5.7.3

    1.5.7.4

    1.5.7.5

    1.5.8

    1.5.8.1

    1.5.8.2

    1.5.8.3

    1.5.8.4

    1.5.8.4.1

    1.5.8.4.2

    1.5.8.4.3

    1.5.8.4.4

    1.5.8.4.5

    1.5.8.5

    1.5.8.5.1

    1.5.8.5.2

    1.5.8.5.3

    1.5.8.5.4

    1.5.8.5.5

    1.5.8.5.6

    1.5.8.5.7

    1.5.8.5.8

    1.5.8.5.9

    1.5.8.5.10

    1.5.8.5.11

    1.5.8.5.12

    1.5.8.5.13

    1.5.9

    1.5.10

    1.6

    1.6.1

    1.6.1.1

    1.6.1.2

    1.6.1.3

    1.6.1.4

    1.6.1.5

    1.6.1.6

    执行配置

    程序打包和分布式执行

    并行执行

    执行计划

    重启策略

    类库

    FlinkCEP-Flink的复杂事件处理

    风暴兼容性Beta

    Gelly:FlinkGraphAPI

    图API

    迭代图处理

    类库方法

    图算法

    图形生成器

    二分图

    FlinkML-Flink的机器学习

    快速入门指南

    如何贡献

    交叉验证

    DistanceMetrics

    k-NearestNeighbors关联

    MinMaxScaler

    MultipleLinearRegression

    在管道的引擎盖下看

    PolynomialFeatures

    随机异常值选择

    StandardScaler

    AlternatingLeastSquares

    SVMusingCoCoA

    最佳实践

    API迁移指南

    部署和运营

    集群和部署

    独立群集

    YARN设置

    Mesos设置

    Kubernetes设置

    Docker设置

    亚马逊网络服务(AWS)

    31.6.1.7

    1.6.1.8

    1.6.1.9

    1.6.2

    1.6.3

    1.6.3.1

    1.6.3.2

    1.6.3.3

    1.6.3.4

    1.6.4

    1.6.5

    1.6.6

    1.6.7

    1.6.8

    1.6.9

    1.6.10

    1.6.11

    1.7

    1.7.1

    1.7.2

    1.7.3

    1.7.4

    1.7.5

    1.7.6

    1.7.7

    1.7.8

    1.7.9

    1.8

    1.8.1

    1.8.2

    1.9

    1.9.1

    1.9.2

    1.9.3

    1.9.4

    1.9.5

    GoogleComputeEngine设置

    MapR设置

    Hadoop集成

    JobManager高可用性(HA)

    状态和容错

    检查点

    保存点

    状态后台

    调整检查点和大状态

    配置

    生产准备清单

    命令行界面

    ScalaREPL

    Kerberos身份验证设置和配置

    SSL设置

    文件系统

    升级应用程序和Flink版本

    调试和监控

    度量

    如何使用日志记录

    历史服务器

    监控检查点

    监测背压

    监控RESTAPI

    调试Windows和事件时间

    调试类加载

    应用程序分析

    FlinkDevelopment

    将Flink导入IDE

    从Source建立Flink

    内幕

    组件堆栈

    数据流容错

    工作和调度

    任务生命周期

    文件系统

    45ApacheFlink文档

    译者:flink.sojb.cn

    在线阅读

    PDF格式

    EPUB格式

    MOBI格式

    代码仓库

    本文档适用于ApacheFlink1.7-SNAPSHOT版。这些页面的建立时间为:

    090818,中部标准时间07:53:00。

    ApacheFlink是一个用于分布式流和批处理数据处理的开源平台。Flink的核心是流

    数据流引擎,为数据流上的分布式计算提供数据分发,通信和容错。Flink在流引擎

    之上构建批处理,覆盖本机迭代支持,托管内存和程序优化。

    第一步

    概念:从Flink的数据流编程模型和分布式运行时环境的基本概念开始。这将有

    助于您了解文档的其他部分,包括设置和编程指南。我们建议您先阅读这些部

    分。

    教程:

    实现并运行DataStream应用程序

    设置本地Flink群集

    编程指南:您可以阅读我们关于基本API概念和DataStreamAPI或DataSetAPI

    的指南,以了解如何编写您的第一个Flink程序。

    部署

    在将Flink工作投入生产之前,请阅读生产准备清单。

    发行说明

    发行说明涵盖了Flink版本之间的重要更改。如果您计划将Flink设置升级到更高版

    本,请仔细阅读这些说明。

    Flink1.6发行说明。

    Flink1.5发行说明。

    外部资源

    ApacheFlink文档

    6FlinkForward:FlinkForward网站和YouTube上提供了以往会议的讲座。使

    用ApacheFlink进行强大的流处理是一个很好的起点。

    培训:数据工匠的培训材料包括幻灯片,练习和示例解决方案。

    博客:ApacheFlink和数据工匠博客发布了有关Flink的频繁,深入的技术文

    章。

    ApacheFlink文档

    7概念

    概念

    8数据流编程模型

    译者:flink.sojb.cn

    抽象层次

    Flink提供不同级别的抽象来开发流批处理应用程序。

    最低级抽象只提供有状态流。它通过ProcessFunction嵌入到DataStreamAPI

    中。它允许用户自由处理来自一个或多个流的事件,并使用一致的容错状态。

    此外,用户可以注册事件时间和处理时间回调,允许程序实现复杂的计算。

    实际上,大多数应用程序不需要上述低级抽象,而是针对CoreAPI编程,如

    DataStreamAPI(有界无界流)和DataSetAPI(有界数据集)。这些流畅的

    API提供了用于数据处理的通用构建块,例如各种形式的用户指定的转换,连

    接,聚合,窗口,状态等。在这些API中处理的数据类型在相应的编程语言中

    表示为类。

    低级ProcessFunction与DataStreamAPI集成,因此只能对某些算子操作进行

    低级抽象。该数据集API提供的有限数据集的其他原语,如循环迭代。

    该TableAPI是为中心的声明性DSL表,其可被动态地改变的表(表示流

    时)。该TableAPI遵循(扩展)关系模型:表有一个模式连接(类似于在关

    系数据库中的表)和API提供可比的算子操作,如选择,项目,连接,分组依

    据,聚合等TableAPI程序以声明方式定义应该执行的逻辑算子操作,而不是

    准确指定算子操作代码的外观。虽然TableAPI可以通过各种类型的用户定义

    函数进行扩展,但它的表现力不如CoreAPI,但使用更简洁(编写的代码更

    少)。此外,TableAPI程序还会通过优化程序,在执行之前应用优化规则。

    可以在表和DataStreamDataSet之间无缝转换,允许程序混合TableAPI以及

    DataStream和DataSetAPI。

    数据流编程模型

    9Flink提供的最高级抽象是SQL。这种抽象在语义和表达方面类似于Table

    API,但是将程序表示为SQL查询表达式。在SQL抽象与TableAPI紧密地相互

    作用,和SQL查询可以通过定义表来执行TableAPI。

    程序和数据流

    Flink程序的基本构建块是流和转换。(请注意,Flink的DataSetAPI中使用的

    DataSet也是内部流-稍后会详细介绍。)从概念上讲,流是(可能永无止境的)

    数据记录流,而转换是将一个或多个流作为一个或多个流的算子操作。输入,并产

    生一个或多个输出流。

    执行时,Flink程序映射到流数据流,由流和转换算子组成。每个数据流都以一个或

    多个源开头,并以一个或多个接收器结束。数据流类似于任意有向无环图

    (DAG)。尽管通过迭代结构允许特殊形式的循环,但为了简单起见,我们将在

    大多数情况下对此进行掩饰。

    通常,程序中的转换与数据流中的算子之间存在一对一的对应关系。但是,有时一

    个转换可能包含多个转换算子。

    源流和接收器记录在流连接器和批处理连接器文档中。DataStream算子和DataSet

    转换中记录了转换。

    数据流编程模型

    10并行数据流

    Flink中的程序本质上是并行和分布式的。在执行期间,流具有一个或多个流分区,并且每个算子具有一个或多个算子子任务。算子子任务彼此独立,并且可以在不

    同的线程中执行,并且可能在不同的机器或容器上执行。

    算子子任务的数量是该特定算子的并行度。流的并行性始终是其生成算子的并行

    性。同一程序的不同算子可能具有不同的并行级别。

    流可以以一对一(或转发)模式或以重新分发模式在两个算子之间传输数据:

    一对一流(例如,在上图中的Source和map算子之间)保存数据元的分区

    和排序。这意味着map算子的subtask[1]将以与Source算子的subtask

    [1]生成的顺序相同的顺序看到相同的数据元。

    重新分配流(在上面的map和keyBywindow之间,以及keyBywindow

    和Sink之间)重新分配流。每个算子子任务将数据发送到不同的目标子任务,具体取决于所选的转换。实例是keyBy(其通过散列Keys重新分区),广播,或Rebalance(其重新分区随机地)。在重新分配交换中,数据元之间的排序仅保存在每对发送和接收子任务中(例如,map的子任

    务[1]和子任务[2]keyBywindow)。因此,在此示例中,保存了每个Keys内

    的排序,但并行性确实引入了关于不同Keys的聚合结果到达接收器的顺序的非

    确定性。

    有关配置和控制并行性的详细信息,请参阅并行执行的文档。

    数据流编程模型

    11窗口

    聚合事件(例如,计数,总和)在流上的工作方式与批处理方式不同。例如,不可

    能计算流中的所有数据元,因为流通常是无限的(无界)。相反,流上的聚合(计

    数,总和等)由窗口限定,例如“在最后5分钟内计数”或“最后100个数据元的总

    和”。

    Windows可以是时间驱动的(例如:每30秒)或数据驱动(例如:每100个数据

    元)。一个典型地区分不同类型的窗口,例如翻滚窗口(没有重叠),滑动窗口

    (具有重叠)和会话窗口(由不活动的间隙打断)。

    更多窗口示例可以在此博客文章中找到。更多详细信息在窗口文档中。

    时间

    当在流程序中引用时间(例如定义窗口)时,可以参考不同的时间概念:

    事件时间是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感

    器或生产服务附加。Flink通过时间戳分配器访问事件时间戳。

    摄取时间是事件在源算子处输入Flink数据流的时间。

    处理时间是执行基于时间的算子操作的每个算子的本地时间。

    有关如何处理时间的更多详细信息,请参阅事件时间文档。

    有状态的算子操作

    数据流编程模型

    12虽然数据流中的许多算子操作只是一次查看一个单独的事件(例如事件解析器),但某些算子操作会记住多个事件(例如窗口算子)的信息。这些算子操作称为有

    状态。

    状态算子操作的状态保持在可以被认为是嵌入式键值存储的状态中。状态被分区

    并严格地与有状态算子读取的流一起分发。因此,只有在keyBy函数之后才能

    在被Key化的数据流上访问键值状态,并且限制为与当前事件的键相关联的值。对

    齐流和状态的Keys可确保所有状态更新都是本地算子操作,从而保证一致性而无

    需事务开销。此对齐还允许Flink重新分配状态并透明地调整流分区。

    有关更多信息,请参阅有关状态的文档。

    容错检查点

    Flink使用流重放和检查点的组合实现容错。检查点与每个输入流中的特定点以及每

    个算子的对应状态相关。通过恢复算子的状态并从检查点重放事件,可以从检查点

    恢复流数据流,同时保持一致性(恰好一次处理语义)。

    检查点间隔是在执行期间用恢复时间(需要重放的事件的数量)来折衷容错开销的

    手段。

    容错内部的描述提供了有关Flink如何管理检查点和相关主题的更多信息。有关启用

    和配置检查点的详细信息,请参阅检查点API文档。

    流处理批处理

    Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限数量的数据

    元)。一个数据集在内部视为数据流。因此,上述概念以相同的方式应用于批处理

    程序,并且它们适用于流程序,除了少数例外:

    数据流编程模型

    13批处理程序的容错不使用检查点。通过完全重放流来恢复。这是可能的,因为

    输入有限。这会使成本更多地用于恢复,但使常规处理更便宜,因为它避免了

    检查点。

    DataSetAPI中的有状态算子操作使用简化的内存核外数据结构,而不是键值

    索引。

    DataSetAPI引入了特殊的同步(超级步骤)迭代,这些迭代只能在有界流上

    进行。有关详细信息,请查看迭代文档。

    下一步

    继续使用Flink的DistributedRuntime中的基本概念。

    数据流编程模型

    14分布式运行时环境

    译者:flink.sojb.cn

    任务和算子链

    对于分布式执行,Flink链算子子任务一起放入任务。每个任务由一个线程执行。将

    算子链接到任务中是一项有用的优化:它可以Reduce线程到线程切换和缓冲的开

    销,并在降低延迟的同时提高整体吞吐量。可以配置链接行为;有关详细信息,请

    参阅链接文档。

    下图中的示例数据流由五个子任务执行,因此具有五个并行线程。

    TaskManager,JobManager,客户端

    Flink运行时包含两种类型的进程:

    该JobManagers(也称为Masters)协调分布式执行。他们安排任务,协调检

    查点,协调故障恢复等。

    总是至少有一个JobManager。高可用性设置将具有多个JobManagers,其中

    一个始终是Leader,其他人处于待机状态。

    该TaskManagers(也叫工人)执行任务(或者更具体地说,子任务)的数据

    流,以及缓冲器和交换数据流。

    分布式运行时环境

    15必须始终至少有一个TaskManager。

    JobManagers和TaskManagers可以通过多种方式启动:作为独立集群直接在计算

    机上,在容器中,或由YARN或Mesos等资源框架管理。TaskManagers连接到

    JobManagers,宣布自己可用,并被分配工作。

    该客户端是不运行时和程序执行的一部分,而是被用来准备和发送的数据流的

    JobManager。之后,客户端可以断开连接或保持连接以接收进度报告。客户端既

    可以作为触发执行的JavaScala程序的一部分运行,也可以在命令行进程中运

    行 .binflinkrun...。

    任务槽和资源

    每个worker(TaskManager)都是一个JVM进程,可以在不同的线程中执行一个或

    多个子任务。为了控制工人接受的任务数量,工人有所谓的任务槽(至少一个)。

    每个任务槽代表TaskManager的固定资源子集。例如,具有三个插槽的

    TaskManager将其13的托管内存专用于每个插槽。切换资源意味着子任务不会与来

    自其他作业的子任务竞争托管内存,而是具有一定数量的保存托管内存。请注意,此处不会发生CPU隔离;当前插槽只分离任务的托管内存。

    通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager有

    一个插槽意味着每个任务组在一个单独的JVM中运行(例如,可以在一个单独的容

    器中启动)。拥有多个插槽意味着更多子任务共享同一个JVM。同一JVM中的任务

    分布式运行时环境

    16共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而Reduce每任务开销。

    默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们

    来自同一个作业。结果是一个槽可以保存作业的整个管道。允许此插槽共享有两个

    主要好处:

    Flink集群需要与作业中使用的最高并行度一样多的任务槽。无需计算程序总共

    包含多少任务(具有不同的并行性)。

    更容易获得更好的资源利用率。如果没有插槽共享,非密集源map子任

    务将阻止与资源密集型窗口子任务一样多的资源。通过插槽共享,将示例中的

    基本并行性从2增加到6可以充分利用时隙资源,同时确保繁重的子任务在

    TaskManagers之间公平分配。

    API还包括可用于防止不期望的时隙共享的资源组机制。

    根据经验,一个很好的默认任务槽数就是CPU核心数。使用超线程,每个插槽然后

    需要2个或更多硬件线程上下文。

    状态后台

    分布式运行时环境

    17存储键值索引的确切数据结构取决于所选的状态后台。一个状态后台将数据存储在

    内存中的哈希映射中,另一个状态后台使用RocksDB作为键值存储。除了定义保

    存状态的数据结构之外,状态后台还实现逻辑以获取键值状态的时间点

    SNAPSHOT,并将该SNAPSHOT存储为检查点的一部分。

    保存点

    用DataStreamAPI编写的程序可以从保存点恢复执行。保存点允许更新程序和

    Flink群集,而不会丢失任何状态。

    保存点是手动触发的检查点,它会获取程序的SNAPSHOT并将其写入状态后台。

    他们依靠常规的检查点机制。在执行期间,程序会定期在工作节点上创建

    SNAPSHOT并生成检查点。对于恢复,仅需要最后完成的检查点,并且一旦新的

    检查点完成,就可以安全地丢弃旧的检查点。

    保存点与这些定期检查点类似,不同之处在于它们由用户触发,并且在较新的检查

    点完成时不会自动过期。可以从命令行或通过RESTAPI取消作业时创建保存点。

    分布式运行时环境

    18教程

    教程

    19API教程

    API教程

    20DataStreamAPI教程

    译者:flink.sojb.cn

    在本指南中,我们将从头开始,从设置Flink项目到在Flink集群上运行流分析程序。

    Wikipedia提供了一个IRC频道,其中记录了对Wiki的所有编辑。我们将在Flink中读

    取此通道,并计算每个用户在给定时间窗口内编辑的字节数。这很容易使用Flink在

    几分钟内实现,但它将为您提供一个良好的基础,从而开始自己构建更复杂的分析

    程序。

    设置Maven项目

    我们将使用FlinkMavenArchetype来创建我们的项目结构。有关此内容的更多详细

    信息,请参阅JavaAPI快速入门。出于我们的目的,运行命令是这样的:

    mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeCatalog=https:repository.apache.orgcontentrep

    ositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT -DgroupId=wiki-edits -DartifactId=wiki-edits -Dversion=0.1 -Dpackage=wikiedits -DinteractiveMode=false

    注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-

    DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加

    存储库条目。有关此更改的详细信息,请参阅Maven官方文档

    您可以编辑 groupId, artifactId而 package如果你喜欢。使用上面的参

    数,Maven将创建一个如下所示的项目结构:

    DataStreamAPI教程

    21treewiki-edits

    wiki-edits

    ├──pom.xml

    └──src

    └──main

    ├──java

    │└──wikiedits

    │├──BatchJob.java

    │├──SocketTextStreamWordCount.java

    │├──StreamingJob.java

    │└──WordCount.java

    └──resources

    └──log4j.properties

    我们的 pom.xml文件已经在根目录中添加了Flink依赖项,并且有几个示例Flink程

    序 srcmainjava。我们可以删除示例程序,因为我们将从头开始:

    rmwiki-editssrcmainjavawikiedits.java

    作为最后一步,我们需要将FlinkWikipedia连接器添加为依赖关系,以便我们可以

    在我们的程序中使用它。编辑它的 dependencies部分 pom.xml,使它看起来像

    这样:

    

    

    org.apache.flink

    flink-java

    {flink.version}

    

    org.apache.flink

    flink-streaming-java_2.11

    {flink.version}

    

    org.apache.flink

    flink-clients_2.11

    {flink.version}

    

    org.apache.flink

    flink-connector-wikiedits_2.11

    {flink.version}

    DataStreamAPI教程

    22注意 flink-connector-wikiedits_2.11添加的依赖项。(此示例和Wikipedia连

    接器的灵感来自ApacheSamza的HelloSamza示例。)

    编写Flink程序

    这是编码时间。启动您喜欢的IDE并导入Maven项目或打开文本编辑器并创建文

    件 srcmainjavawikieditsWikipediaAnalysis.java:

    packagewikiedits;

    publicclassWikipediaAnalysis{

    publicstaticvoidmain(String[]args)throwsException{

    }

    }

    该计划现在非常基础,但我们会尽力填写。请注意,我不会在此处提供import语

    句,因为IDE可以自动添加它们。在本节结束时,如果您只想跳过并在编辑器中输

    入,我将使用import语句显示完整的代码。

    Flink程序的第一步是创建一个 StreamExecutionEnvironment(或

    者 ExecutionEnvironment如果您正在编写批处理作业)。这可用于设置执行参

    数并创建从外部系统读取的源。所以让我们继续把它添加到main方法:

    StreamExecutionEnvironmentsee=StreamExecutionEnvironment.getE

    xecutionEnvironment;

    接下来,我们将创建一个从WikipediaIRC日志中读取的源:

    DataStreamedits=see.addSource(newWikiped

    iaEditsSource);

    这创建了一个我们可以进一步处理 DataStream的 WikipediaEditEvent数据

    元。出于本示例的目的,我们感兴趣的是确定每个用户在特定时间窗口中添加或删

    除的字节数,比如说五秒。为此,我们首先必须指定我们要在用户名上键入流,也

    就是说此流上的算子操作应考虑用户名。在我们的例子中,窗口中编辑的字节的总

    和应该是每个唯一的用户。对于键入流,我们必须提供一个 KeySelector,如下

    所示:

    DataStreamAPI教程

    23KeyedStreamkeyedEdits=edits

    .keyBy(newKeySelector{

    @Override

    publicStringgetKey(WikipediaEditEventevent){

    returnevent.getUser;

    }

    });

    这为我们提供了一个 WikipediaEditEvent具有 StringKeys的用户名。我们现

    在可以指定我们希望在此流上加上窗口,并根据这些窗口中的数据元计算结果。窗

    口指定要在其上执行计算的Stream片。在无限的数据元流上计算聚合时需要

    Windows。在我们的例子中,我们将说我们想要每五秒聚合一次编辑的字节总和:

    DataStream>result=keyedEdits

    .timeWindow(Time.seconds(5))

    .fold(newTuple2<>(,0L),newFoldFunction
    ent,Tuple2>{

    @Override

    publicTuple2fold(Tuple2ac

    c,WikipediaEditEventevent){

    acc.f0=event.getUser;

    acc.f1+=event.getByteDiff;

    returnacc;

    }

    });

    第一个调用, .timeWindow指定我们想要有五秒钟的翻滚(非重叠)窗口。第

    二个调用为每个唯一键指定每个窗口切片的折叠变换。在我们的例子中,我们从一

    个初始值开始, (,0L)并在其中为用户添加该时间窗口中每个编辑的字节差

    异。生成的Stream现在包含 Tuple2每五秒钟发出一次的

    用户。

    剩下要做的就是将流打印到控制台并开始执行:

    result.print;

    see.execute;

    最后一次调用是启动实际Flink作业所必需的。所有算子操作(例如创建源,转换和

    接收器)仅构建内部算子操作的图形。只有在 execute被调用时才会在集群

    上抛出或在本地计算机上执行此算子操作图。

    到目前为止完整的代码是这样的:

    packagewikiedits;

    DataStreamAPI教程

    24importorg.apache.flink.api.common.functions.FoldFunction;

    importorg.apache.flink.api.java.functions.KeySelector;

    importorg.apache.flink.api.java.tuple.Tuple2;

    importorg.apache.flink.streaming.api.datastream.DataStream;

    importorg.apache.flink.streaming.api.datastream.KeyedStream;

    importorg.apache.flink.streaming.api.environment.StreamExecutio

    nEnvironment;

    importorg.apache.flink.streaming.api.windowing.time.Time;

    importorg.apache.flink.streaming.connectors.wikiedits.Wikipedia

    EditEvent;

    importorg.apache.flink.streaming.connectors.wikiedits.Wikipedia

    EditsSource;

    publicclassWikipediaAnalysis{

    publicstaticvoidmain(String[]args)throwsException{

    StreamExecutionEnvironmentsee=StreamExecutionEnvironment.

    getExecutionEnvironment;

    DataStreamedits=see.addSource(newWik

    ipediaEditsSource);

    KeyedStreamkeyedEdits=edits

    .keyBy(newKeySelector{

    @Override

    publicStringgetKey(WikipediaEditEventevent){

    returnevent.getUser;

    }

    });

    DataStream>result=keyedEdits

    .timeWindow(Time.seconds(5))

    .fold(newTuple2<>(,0L),newFoldFunction
    Event,Tuple2>{

    @Override

    publicTuple2fold(Tuple2ac

    c,WikipediaEditEventevent){

    acc.f0=event.getUser;

    acc.f1+=event.getByteDiff;

    returnacc;

    }

    });

    result.print;

    see.execute;

    }

    }

    您可以使用Maven在IDE或命令行上运行此示例:

    DataStreamAPI教程

    25mvncleanpackage

    mvnexec:java-Dexec.mainClass=wikiedits.WikipediaAnalysis

    第一个命令构建我们的项目,第二个命令执行我们的主类。输出应该类似于:

    1>(Fenixdown,114)

    6>(AnomieBOT,155)

    8>(BD2412bot,-3690)

    7>(IgnorantArmies,49)

    3>(Ckh3111,69)

    5>(Slade360,0)

    7>(Narutolovehinata5,2195)

    6>(Vuyisa2001,79)

    4>(MsSarahWelch,269)

    4>(KasparBot,-245)

    每行前面的数字告诉您输出生成的打印接收器的哪个并行实例。

    这应该让您开始编写自己的Flink程序。要了解更多信息,您可以查看我们的基本概

    念指南和DataStreamAPI。如果您想了解如何在自己的机器上设置Flink群集并将

    结果写入Kafka,请坚持参加奖励练习。

    奖金练习:在群集上运行并写入Kafka

    请按照我们的本地安装教程在您的机器上设置Flink分发,并在继续算子操作之前

    参考Kafka快速入门以设置Kafka安装。

    作为第一步,我们必须添加FlinkKafka连接器作为依赖关系,以便我们可以使用

    Kafka接收器。将其添加到 pom.xml依赖项部分中的文件:

    

    org.apache.flink

    flink-connector-kafka-0.8_2.11

    {flink.version}

    接下来,我们需要修改我们的程序。我们将移除 print水槽,而是使用Kafka水

    槽。新代码如下所示:

    DataStreamAPI教程

    26result

    .map(newMapFunction,String>{

    @Override

    publicStringmap(Tuple2tuple){

    returntuple.toString;

    }

    })

    .addSink(newFlinkKafkaProducer08<>(localhost:9092,wiki-

    result,newSimpleStringSchema));

    还需要导入相关的类:

    importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaPro

    ducer08;

    importorg.apache.flink.api.common.serialization.SimpleStringSch

    ema;

    importorg.apache.flink.api.common.functions.MapFunction;

    注意我们是如何第一个转换的流 Tuple2来流 String使

    用MapFunction。我们这样做是因为将简单字符串写入Kafka更容易。然后,我们创

    建了一个Kafka水槽。您可能必须使主机名和端口适应您的设

    置。 wiki-result是运行我们的程序之前我们将要创建的Kafka流的名称。使

    用Maven构建项目因为我们需要jar文件在集群上运行:

    mvncleanpackage

    生成的jar文件将位于 target子文件夹中: targetwiki-edits-0.1.jar。我

    们稍后会用到它。

    现在我们准备启动Flink集群并运行写入Kafka的程序。转到安装Flink的位置并启动

    本地群集:

    cdmyflinkdirectory

    binstart-cluster.sh

    我们还必须创建Kafka主题,以便我们的程序可以写入它:

    cdmykafkadirectory

    binkafka-topics.sh--create--zookeeperlocalhost:2181--topi

    cwiki-results

    现在我们准备在本地Flink集群上运行我们的jar文件:

    DataStreamAPI教程

    27cdmyflinkdirectory

    binflinkrun-cwikiedits.WikipediaAnalysispathtowikiedits

    -0.1.jar

    如果一切按计划进行,那么该命令的输出应该与此类似:

    0308201615:09:27JobexecutionswitchedtostatusRUNNING.

    0308201615:09:27Source:CustomSource(11)switchedtoSCHED

    ULED

    0308201615:09:27Source:CustomSource(11)switchedtoDEPLO

    YING

    0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(

    5000),FoldingStateDescriptor{name=window-contents,defaultValue

    =(,0),serializer=null},ProcessingTimeTrigger,WindowedStream

    .fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi

    tchedtoSCHEDULED

    0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(

    5000),FoldingStateDescriptor{name=window-contents,defaultValue

    =(,0),serializer=null},ProcessingTimeTrigger,WindowedStream

    .fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi

    tchedtoDEPLOYING

    0308201615:09:27TriggerWindow(TumblingProcessingTimeWindows(

    5000),FoldingStateDescriptor{name=window-contents,defaultValue

    =(,0),serializer=null},ProcessingTimeTrigger,WindowedStream

    .fold(WindowedStream.java:207))->Map->Sink:Unnamed(11)swi

    tchedtoRUNNING

    0308201615:09:27Source:CustomSource(11)switchedtoRUNNI

    NG

    您可以看到各个算子如何开始运行。只有两个,因为出于性能原因,窗口之后的算

    子操作被折叠成一个算子操作。在Flink,我们称之为链接。

    您可以通过使用Kafka控制台使用者检查Kafka主题来观察程序的输出:

    binkafka-console-consumer.sh--zookeeperlocalhost:2181--topi

    cwiki-result

    您还可以查看应在http:localhost:8081上运行的Flink仪表板。您将获得群集资

    源和正在运行的作业的概述:

    如果单击正在运行的作业,您将获得一个视图,您可以在其中检查各个算子操作,例如,查看已处理数据元的数量:

    DataStreamAPI教程

    28这就结束了我们对Flink的小游览。如果您有任何疑问,请随时询问我们的邮件列

    表。

    DataStreamAPI教程

    29Setup教程

    Setup教程

    30本地安装教程

    译者:flink.sojb.cn

    只需几个简单的步骤即可启动并运行Flink示例程序。

    设置:下载并启动Flink

    Flink可在Linux,MacOSX和Windows上运行。为了能够运行Flink,唯一的要求

    是安装一个有效的Java8.x.Windows用户,请查看Windows上的Flink指南,该指

    南介绍了如何在Windows上运行Flink以进行本地设置。

    您可以通过发出以下命令来检查Java正确安装:

    java-version

    如果你有Java8,输出将如下所示:

    javaversion1.8.0_111

    Java(TM)SERuntimeEnvironment(build1.8.0_111-b14)

    JavaHotSpot(TM)64-BitServerVM(build25.111-b14,mixedmode)

    下载并编译

    从我们的某个存储库克隆源代码,例如:

    gitclonehttps:github.comapacheflink.git

    cdflink

    mvncleanpackage-DskipTeststhiswilltakeupto10minute

    s

    cdbuild-targetthisiswhereFlinkisinstall

    edto

    启动本地Flink群集

    .binstart-cluster.shStartFlink

    检查分派器的web前端在HTTP:localhost:8081,并确保一切都正常运行。Web

    前端应报告单个可用的TaskManager实例。

    本地安装教程

    31您还可以通过检查 logs目录中的日志文件来验证系统是否正在运行:

    taillogflink--standalonesession-.log

    INFO...-Restendpointlisteningatlocalhost:8081

    INFO...-http:localhost:8081wasgrantedleadership...

    INFO...-Webfrontendlisteningathttp:localhost:8081.

    INFO...-StartingRPCendpointforStandaloneResourceManagera

    takka:flinkuserresourcemanager.

    INFO...-StartingRPCendpointforStandaloneDispatcheratakk

    a:flinkuserdispatcher.

    INFO...-ResourceManagerakka.tcp:[[emailprotected]](cdn-c

    gilemail-protection):6123userresourcemanagerwasgrantedlea

    dership...

    INFO...-StartingtheSlotManager.

    INFO...-Dispatcherakka.tcp:[[emailprotected]](cdn-cgil

    email-protection):6123userdispatcherwasgrantedleadership..

    .

    INFO...-Recoveringallpersistedjobs.

    INFO...-RegisteringTaskManager...under...attheSlotMana

    ger.

    本地安装教程

    32阅读代码

    您可以在Scala和Java上的GitHub上找到此SocketWindowWordCount示例的完整源

    代码。

    Scala

    Java

    objectSocketWindowWordCount{

    defmain(args:Array[String]):Unit={

    theporttoconnectto

    valport:Int=try{

    ParameterTool.fromArgs(args).getInt(port)

    }catch{

    casee:Exception=>{

    System.err.println(Noportspecified.Pleaseru

    n'SocketWindowWordCount--port')

    return

    }

    }

    gettheexecutionenvironment

    valenv:StreamExecutionEnvironment=StreamExecutionEnv

    ironment.getExecutionEnvironment

    getinputdatabyconnectingtothesocket

    valtext=env.socketTextStream(localhost,port,'\n')

    parsethedata,groupit,windowit,andaggregateth

    ecounts

    valwindowCounts=text

    .flatMap{w=>w.split(\\s)}

    .map{w=>WordWithCount(w,1)}

    .keyBy(word)

    .timeWindow(Time.seconds(5),Time.seconds(1))

    .sum(count)

    printtheresultswithasinglethread,ratherthani

    nparallel

    windowCounts.print.setParallelism(1)

    env.execute(SocketWindowWordCount)

    }

    Datatypeforwordswithcount

    caseclassWordWithCount(word:String,count:Long)

    }

    本地安装教程

    33publicclassSocketWindowWordCount{

    publicstaticvoidmain(String[]args)throwsException{

    theporttoconnectto

    finalintport;

    try{

    finalParameterToolparams=ParameterTool.fromArgs(

    args);

    port=params.getInt(port);

    }catch(Exceptione){

    System.err.println(Noportspecified.Pleaserun'S

    ocketWindowWordCount--port');

    return;

    }

    gettheexecutionenvironment

    finalStreamExecutionEnvironmentenv=StreamExecutionEn

    vironment.getExecutionEnvironment;

    getinputdatabyconnectingtothesocket

    DataStreamtext=env.socketTextStream(localhos

    t,port,\n);

    parsethedata,groupit,windowit,andaggregateth

    ecounts

    DataStreamwindowCounts=text

    .flatMap(newFlatMapFunction{

    @Override

    publicvoidflatMap(Stringvalue,Collector
    WithCount>out){

    for(Stringword:value.split(\\s)){

    out.collect(newWordWithCount(word,1L));

    }

    }

    })

    .keyBy(word)

    .timeWindow(Time.seconds(5),Time.seconds(1))

    .reduce(newReduceFunction{

    @Override

    publicWordWithCountreduce(WordWithCounta,Wor

    dWithCountb){

    returnnewWordWithCount(a.word,a.count+b

    .count);

    }

    });

    printtheresultswithasinglethread,ratherthani

    nparallel

    windowCounts.print.setParallelism(1);

    本地安装教程

    34env.execute(SocketWindowWordCount);

    }

    Datatypeforwordswithcount

    publicstaticclassWordWithCount{

    publicStringword;

    publiclongcount;

    publicWordWithCount{}

    publicWordWithCount(Stringword,longcount){

    this.word=word;

    this.count=count;

    }

    @Override

    publicStringtoString{

    returnword+:+count;

    }

    }

    }

    运行示例

    现在,我们将运行此Flink应用程序。它将从套接字读取文本,并且每5秒打印一次

    前5秒内每个不同单词的出现次数,即处理时间的翻滚窗口,只要文字漂浮在其

    中。

    首先,我们使用netcat来启动本地服务器

    nc-l9000

    提交Flink计划:

    .binflinkrunexamplesstreamingSocketWindowWordCount.jar-

    -port9000

    Startingexecutionofprogram

    程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:

    本地安装教程

    35本地安装教程

    36单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到 stdout。监

    视TaskManager的输出文件并写入一些文本 nc(输入在点击后逐行发送到

    Flink):

    nc-l9000

    loremipsum

    ipsumipsumipsum

    bye

    该 .out文件将在每个时间窗口结束时,只要打印算作字浮在,例如:

    tail-flogflink--taskexecutor-.out

    lorem:1

    bye:1

    ipsum:4

    要停止Flink当你做类型:

    .binstop-cluster.sh

    下一步

    查看更多示例以更好地了解Flink的编程API。完成后,请继续阅读流处理指南。

    本地安装教程

    37在Windows上运行Flink

    译者:flink.sojb.cn

    如果要在Windows计算机上本地运行Flink,则需要下载并解压缩二进制Flink分发。

    之后,您可以使用Windows批处理文件( .bat),或使用Cygwin运行Flink

    JobManager。

    从Windows批处理文件开始

    要从Windows命令行启动Flink,请打开命令窗口,导航到 binFlink目录并运

    行 start-cluster.bat。

    注意: binJavaRuntimeEnvironment的文件夹必须包含在Window

    的 %PATH%变量中。按照本指南将Java添加到 %PATH%变量中。

    cdflink

    cdbin

    start-cluster.bat

    StartingalocalclusterwithoneJobManagerprocessandoneTas

    kManagerprocess.

    YoucanterminatetheprocessesviaCTRL-Cinthespawnedshell

    windows.

    Webinterfacebydefaultonhttp:localhost:8081.

    之后,您需要打开第二个终端来运行作业 flink.bat。

    从Cygwin和UnixScripts开始

    使用Cygwin,您需要启动Cygwin终端,导航到您的Flink目录并运

    行 start-cluster.sh脚本:

    cdflink

    binstart-cluster.sh

    Startingcluster.

    从Git安装Flink

    如果您正在从git存储库安装Flink并且您正在使用Windowsgitshell,则Cygwin可能

    会产生类似于以下的故障:

    在Windows上运行Flink

    38c:flinkbinstart-cluster.sh:line30:figure>\r':commandnot

    found

    发生此错误是因为在Windows中运行时,git会自动将UNIX行结尾转换为Windows

    样式行结尾。问题是Cygwin只能处理UNIX样式的行结尾。解决方案是通过以下三

    个步骤调整Cygwin设置以处理正确的行结尾:

    1. 启动一个Cygwinshell。

    2. 输入确定您的主目录

    cd;pwd

    ThiswillreturnapathundertheCygwinrootpath.

    1. 使用NotePad,写字板或其他文本编辑器打开 .bash_profile主目录中的文

    件并附加以下内容:(如果文件不存在,则必须创建它)

    exportSHELLOPTS

    set-oigncr

    保存文件并打开一个新的bashshell。

    在Windows上运行Flink

    39例子

    译者:flink.sojb.cn

    捆绑的例子

    Flink的来源包括Flink不同API的许多示例:

    DataStream应用程序(JavaScala)

    DataSet应用程序(JavaScala)

    TableAPISQL查询(JavaScala)

    这些说明解释了如何运行示例。

    Web上的示例

    还有一些在线发布的博客文章讨论了示例应用程序:

    如何使用ApacheFlink构建有状态流应用程序提供了一个使用DataStreamAPI

    实现的事件驱动应用程序和两个用于流分析的SQL查询。

    使用ApacheFlink,Elasticsearch和Kibana构建实时仪表板应用程序

    是elastic.co的博客文章,展示了如何使用ApacheFlink,Elasticsearch和

    Kibana构建用于流数据分析的实时仪表板解决方案。

    来自DataArtisans的Flink培训网站有很多例子。查看动手部分和练习。

    例子

    40批处理示例

    译者:flink.sojb.cn

    以下示例程序展示了Flink的不同应用程序,从简单的字数统计到图形算法。代码示

    例说明了Flink的DataSetAPI的使用。

    可以在Flink源存储库的flink-examples-batch模块中找到以下和更多示例的完整源代

    码。

    运行一个例子

    为了运行Flink示例,我们假设您有一个正在运行的Flink实例。导航中的“快速入

    门”和“设置”选项卡描述了启动Flink的各种方法。

    最简单的方法是运行 .binstart-cluster.sh,默认情况下启动一个带有一个

    JobManager和一个TaskManager的本地集群。

    Flink的每个二进制版本都包含一个 examples目录,其中包含此页面上每个示例

    的jar文件。

    要运行WordCount示例,请发出以下命令:

    .binflinkrun.examplesbatchWordCount.jar

    其他示例可以以类似的方式启动。

    请注意,通过使用内置数据,许多示例在不传递任何参数的情况下运行。要使用实

    际数据运行WordCount,您必须将路径传递给数据:

    .binflinkrun.examplesbatchWordCount.jar--inputpathto

    sometextdata--outputpathtoresult

    请注意,非本地文件系统需要模式前缀,例如 hdfs:。

    字数

    WordCount是大数据处理系统的“HelloWorld”。它计算文本集合中单词的频率。该

    算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计

    数。

    Java

    Scala

    批处理示例

    41ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvi

    ronment;

    DataSettext=env.readTextFile(pathtofile);

    DataSet>counts=

    splitupthelinesinpairs(2-tuples)containing:(w

    ord,1)

    text.flatMap(newTokenizer)

    groupbythetuplefield0andsumuptuplefield

    1

    .groupBy(0)

    .sum(1);

    counts.writeAsCsv(outputPath,\n,);

    User-definedfunctions

    publicstaticclassTokenizerimplementsFlatMapFunction>{

    @Override

    publicvoidflatMap(Stringvalue,Collector
    nteger>>out){

    normalizeandsplittheline

    String[]tokens=value.toLowerCase.split(\\W+);

    emitthepairs

    for(Stringtoken:tokens){

    if(token.length>0){

    out.collect(newTuple2(token,1));

    }

    }

    }

    }

    该字计数示例实现上述算法的输入参

    数: --input--output。作为测试数据,任何

    文本文件都可以。

    批处理示例

    42valenv=ExecutionEnvironment.getExecutionEnvironment

    getinputdatavaltext=env.readTextFile(pathtofile)

    valcounts=text.flatMap{_.toLowerCase.split(\\W+)filter{

    _.nonEmpty}}

    .map{(_,1)}

    .groupBy(0)

    .sum(1)

    counts.writeAsCsv(outputPath,\n,)

    该字计数示例实现上述算法的输入参

    数: --input--output。作为测试数据,任何

    文本文件都可以。

    网页排名

    PageRank算法计算链接定义的图形中页面的“重要性”,链接指向一个页面到另一个

    页面。它是一种迭代图算法,这意味着它重复应用相同的计算。在每次迭代中,每

    个页面在其所有邻居上分配其当前等级,并将其新等级计算为从其邻居接收的等级

    的纳税总和。PageRank算法由Google搜索引擎推广,该搜索引擎利用网页的重要

    性对搜索查询的结果进行排名。

    在这个简单的例子中,PageRank通过批量迭代和固定数量的迭代来实现。

    Java

    Scala

    ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvi

    ronment;

    readthepagesandinitialranksbyparsingaCSVfile

    DataSet>pagesWithRanks=env.readCsvFile(p

    agesInputPath)

    .types(Long.class,Double.class)

    thelinksareencodedasanadjacencylist:(page-id,Array(n

    eighbor-ids))

    DataSet>pageLinkLists=getLinksDataSet(en

    v);

    setiterativedataset

    IterativeDataSet>iteration=pagesWithRank

    s.iterate(maxIterations);

    DataSet>newRanks=iteration

    joinpageswithoutgoingedgesanddistributerank

    批处理示例

    43.join(pageLinkLists).where(0).equalTo(0).flatMap(newJoi

    nVertexWithEdgesMatch)

    collectandsumranks

    .groupBy(0).sum(1)

    applydampeningfactor

    .map(newDampener(DAMPENING_FACTOR,numPages));

    DataSet>finalPageRanks=iteration.closeWi

    th(

    newRanks,newRanks.join(iteration).where(0).equalTo(0)

    terminationcondition

    .filter(newEpsilonFilter));

    finalPageRanks.writeAsCsv(outputPath,\n,);

    User-definedfunctions

    publicstaticfinalclassJoinVertexWithEdgesMatch

    implementsFlatJoinFunction
    ble>,Tuple2,Tuple2

    >{

    @Override

    publicvoidjoin(page,Tuple2
    ng[]>adj,Collector>out){

    Long[]neighbors=adj.f1;

    doublerank=page.f1;

    doublerankToDistribute=rank((double)neigbors.leng

    th);

    for(inti=0;i
    out.collect(newTuple2(neighbors[i],r

    ankToDistribute));

    }

    }

    }

    publicstaticfinalclassDampenerimplementsMapFunction
    ,Tuple2>{

    privatefinaldoubledampening,randomJump;

    publicDampener(doubledampening,doublenumVertices){

    this.dampening=dampening;

    this.randomJump=(1-dampening)numVertices;

    }

    @Override

    publicTuple2map(Tuple2value)

    {

    value.f1=(value.f1dampening)+randomJump;

    批处理示例

    44returnvalue;

    }

    }

    publicstaticfinalclassEpsilonFilter

    implementsFilterFunction
    uble>,Tuple2>>{

    @Override

    publicbooleanfilter(Tuple2,Tuple2
    ng,Double>>value){

    returnMath.abs(value.f0.f1-value.f1.f1)>EPSILON;

    }

    }

    所述的PageRank程序实现上述实施例。它需要运行以下参

    数: --pages--links--output--numPages--iterations

    User-definedtypescaseclassLink(sourceId:Long,targetId:

    Long)

    caseclassPage(pageId:Long,rank:Double)

    caseclassAdjacencyList(sourceId:Long,targetIds:Array[Long])

    setupexecutionenvironmentvalenv=ExecutionEnvironment.g

    etExecutionEnvironment

    readthepagesandinitialranksbyparsingaCSVfilevalpa

    ges=env.readCsvFile[Page](pagesInputPath)

    thelinksareencodedasanadjacencylist:(page-id,Array(n

    eighbor-ids))vallinks=env.readCsvFile[Link](linksInputPath)

    assigninitialrankstopagesvalpagesWithRanks=pages.map(

    p=>Page(p,1.0numPages))

    buildadjacencylistfromlinkinputvaladjacencyLists=lin

    ks

    initializelists

    .map(e=>AdjacencyList(e.sourceId,Array(e.targetId)))

    concatenatelists

    .groupBy(sourceId).reduce{

    (l1,l2)=>AdjacencyList(l1.sourceId,l1.targetIds++l2.targ

    etIds)

    }

    startiterationvalfinalRanks=pagesWithRanks.iterateWithTe

    rmination(maxIterations){

    currentRanks=>

    valnewRanks=currentRanks

    distributerankstotargetpages

    批处理示例

    45.join(adjacencyLists).where(pageId).equalTo(sourceId)

    {

    (page,adjacent,out:Collector[Page])=>

    for(targetId<-adjacent.targetIds){

    out.collect(Page(targetId,page.rankadjacent.target

    Ids.length))

    }

    }

    collectranksandsumthemup

    .groupBy(pageId).aggregate(SUM,rank)

    applydampeningfactor

    .map{p=>

    Page(p.pageId,(p.rankDAMPENING_FACTOR)+((1-DAMPE

    NING_FACTOR)numPages))

    }

    terminateifnorankupdatewassignificant

    valtermination=currentRanks.join(newRanks).where(pageId).equalTo(pageId){

    (current,next,out:Collector[Int])=>

    checkforsignificantupdate

    if(math.abs(current.rank-next.rank)>EPSILON)out.co

    llect(1)

    }

    (newRanks,termination)

    }

    valresult=finalRanks

    emitresultresult.writeAsCsv(outputPath,\n,)

    hePageRankprogramimplementstheaboveexample.Itrequiresthefollowing

    parameterstorun:--pages--links--output--numPages--iterations

    .

    输入文件是纯文本文件,必须格式如下:

    页面表示为由新行字符分隔的(长)ID。

    例如, 1\n2\n12\n42\n63\n给出五个页面ID为1,2,12,42和63的页

    面。

    链接表示为由空格字符分隔的页面ID对。链接由换行符分隔:

    例如, 12\n212\n112\n4263\n给出四个(定向)链接(1)-

    >(2),(2)->(12),(1)->(12)和(42)->(63)。

    对于这个简单的实现,要求每个页面至少有一个传入链接和一个传出链接(页面可

    以指向自身)。

    连接组件

    批处理示例

    46连通分量算法通过为同一连接部分中的所有顶点分配相同的组件ID来识别较大图形

    的部分。与PageRank类似,ConnectedComponents是一种迭代算法。在每个步

    骤中,每个顶点将其当前组件ID传播到其所有邻居。如果顶点小于其自己的组件

    ID,则顶点接受来自邻居的组件ID。

    此实现使用增量迭代:未更改其组件ID的顶点不参与下一步。这会产生更好的性

    能,因为后面的迭代通常只处理一些异常值顶点。

    Java

    Scala

    readvertexandedgedata

    DataSetvertices=getVertexDataSet(env);

    DataSet>edges=getEdgeDataSet(env).flatMap(

    newUndirectEdge);

    assigntheinitialcomponentIDs(equaltothevertexID)

    DataSet>verticesWithInitialId=vertices.map

    (newDuplicateValue);

    openadeltaiteration

    DeltaIteration,Tuple2>iteration

    =

    verticesWithInitialId.iterateDelta(verticesWithInitialId

    ,maxIterations,0);

    applythesteplogic:

    DataSet>changes=iteration.getWorkset

    joinwiththeedges

    .join(edges).where(0).equalTo(0).with(newNeighborWithCo

    mponentIDJoin)

    selecttheminimumneighborcomponentID

    .groupBy(0).aggregate(Aggregations.MIN,1)

    updateifthecomponentIDofthecandidateissmalle

    r

    .join(iteration.getSolutionSet).where(0).equalTo(0)

    .flatMap(newComponentIdFilter);

    closethedeltaiteration(deltaandnewworksetareidentica

    l)

    DataSet>result=iteration.closeWith(changes

    ,changes);

    emitresult

    result.writeAsCsv(outputPath,\n,);

    User-definedfunctions

    publicstaticfinalclassDuplicateValueimplementsMapFuncti

    on>{

    @Override

    批处理示例

    47publicTuple2map(Tvertex){

    returnnewTuple2(vertex,vertex);

    }

    }

    publicstaticfinalclassUndirectEdge

    implementsFlatMapFunction
    >,Tuple2>{

    Tuple2invertedEdge=newTuple2;

    @Override

    publicvoidflatMap(Tuple2edge,Collector
    2>out){

    invertedEdge.f0=edge.f1;

    invertedEdge.f1=edge.f0;

    out.collect(edge);

    out.collect(invertedEdge);

    }

    }

    publicstaticfinalclassNeighborWithComponentIDJoin

    implementsJoinFunction,Tupl

    e2,Tuple2>{

    @Override

    publicTuple2join(Tuple2vertexWith

    Component,Tuple2edge){

    returnnewTuple2(edge.f1,vertexWithCompone

    nt.f1);

    }

    }

    publicstaticfinalclassComponentIdFilter

    implementsFlatMapFunction
    g,Long>,Tuple2>,Tuple2>

    {

    @Override

    publicvoidflatMap(Tuple2,Tuple2>value,Collector>out){

    if(value.f0.f1
    out.collect(value.f0);

    }

    }

    }

    该ConnectedComponents程序实现上述实施例。它需要运行以下参

    数: --vertices--edges--output--iterations

    批处理示例

    48setupexecutionenvironmentvalenv=ExecutionEnvironment.g

    etExecutionEnvironment

    readvertexandedgedata

    assigntheinitialcomponents(equaltothevertexid)valve

    rtices=getVerticesDataSet(env).map{id=>(id,id)}

    undirectededgesbyemittingforeachinputedgetheinputed

    gesitselfandaninverted

    versionvaledges=getEdgesDataSet(env).flatMap{edge=>Se

    q(edge,(edge._2,edge._1))}

    openadeltaiterationvalverticesWithComponents=vertices.

    iterateDelta(vertices,maxIterations,Array(0)){

    (s,ws)=>

    applythesteplogic:joinwiththeedges

    valallNeighbors=ws.join(edges).where(0).equalTo(0){(ver

    tex,edge)=>

    (edge._2,vertex._2)

    }

    selecttheminimumneighbor

    valminNeighbors=allNeighbors.groupBy(0).min(1)

    updateifthecomponentofthecandidateissmaller

    valupdatedComponents=minNeighbors.join(s).where(0).equalT

    o(0){

    (newVertex,oldVertex,out:Collector[(Long,Long)])=>

    if(newVertex._2
    }

    deltaandnewworksetareidentical

    (updatedComponents,updatedComponents)

    }

    verticesWithComponents.writeAsCsv(outputPath,\n,)

    这个PageRank程序实现了上面的例子。它需要运行以下参

    数: --pages--links--output--numPages--iterations

    输入文件是纯文本文件,必须格式如下:

    顶点表示为ID并用换行符分隔。

    例如, 1\n2\n12\n42\n63\n给出五个顶点(1),(2),(12),(42)和(63)。

    边缘表示为由空格字符分隔的顶点ID的对。边线由换行符分隔:

    例如, 12\n212\n112\n4263\n给出四个(无向)链路(1)-

    (2),(2)-(12),(1)-(12)和(42)-(63)。

    批处理示例

    49批处理示例

    50应用开发

    应用开发

    51项目构建设置

    项目构建设置

    52Java项目模板

    译者:flink.sojb.cn

    只需几个简单的步骤即可开始使用FlinkJava程序。

    要求

    唯一的要求是使用Maven3.0.4(或更高版本)和Java8.x安装。

    创建项目

    使用以下命令之一创建项目:

    使用Maven原型

    运行快速入门脚本

    mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeCatalog=https:repository.apache.orgcontentr

    epositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT

    这允许您命名新创建的项目。它将以交互方式询问您groupId,artifactId和包名称。

    curlhttps:flink.apache.orgqquickstart-SNAPSHOT.sh|bash

    -s1.7-SNAPSHOT

    注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-

    DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加

    存储库条目。有关此更改的详细信息,请参阅Maven官方文档

    检查项目

    您的工作目录中将有一个新目录。如果您使用了curl方法,则会调用该目

    录 quickstart。否则,它的名称为 artifactId:

    Java项目模板

    53treequickstart

    quickstart

    ├──pom.xml

    └──src

    └──main

    ├──java

    │└──org

    │└──myorg

    │└──quickstart

    │├──BatchJob.java

    │└──StreamingJob.java

    └──resources

    └──log4j.properties

    示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob

    是DataStream和DataSet程序的基本框架程序。的主要方法是程序的入口点,无论

    是对在-IDE测试执行并作适当的部署。

    我们建议您将此项目导入IDE以进行开发和测试。IntelliJIDEA支持开箱即用的

    Maven项目。如果您使用Eclipse,则m2e插件允许导入Maven项目。某些Eclipse

    包默认包含该插件,其他包需要您手动安装它。

    MacOSX用户注意事项:对于Flink,Java默认JVM堆可能太小。你必须手动增加

    它。在Eclipse中,选择 RunConfigurations->Arguments并写

    入 VMArguments框: -Xmx800m。在IntelliJIDEA中,推荐的方法是

    从 Help|EditCustomVMOptions菜单中更改JVM选项。有关详细信息,请

    参阅此文

    构建项目

    如果要构建打包项目,请转到项目目录并运行' mvncleanpackage'命令。您将

    找到一个包含您的应用程序的JAR文件,以及您可能已作为依赖项添加到应用程序

    的连接器和库: target-.jar。

    注意:如果您使用与StreamingJob不同的类作为应用程序的主类入口点,我们建

    议您相应地更改文件中的 mainClass设置 pom.xml。这样,Flink可以从JAR文

    件运行时间应用程序,而无需另外指定主类。

    下一步

    写你的申请!

    如果您正在编写流处理应用程序并且正在寻找灵感来写什么,请查看流处理应用程

    序教程。

    如果您正在编写批处理应用程序,并且正在寻找要编写的内容,请查看批处理应用

    程序示例。

    Java项目模板

    54有关API的完整概述,请查看DataStreamAPI和DataSetAPI部分。

    在这里,您可以了解如何在本地群集上的IDE外部运行应用程序。

    如果您有任何问题,请在我们的邮件列表中查询。我们很乐意提供帮助。

    Java项目模板

    55Scala的项目模板

    译者:flink.sojb.cn

    构建工具

    Flink项目可以使用不同的构建工具构建。为了快速入门,Flink为以下构建工具提供

    了项目模板:

    SBT

    Maven

    这些模板可帮助您设置项目结构并创建初始构建文件。

    SBT

    创建项目

    您可以通过以下两种方法之一构建新项目:

    使用sbt模板

    运行快速入门脚本

    sbtnewtillrohrmannflink-project.g8

    这将提示您输入几个参数(项目名称,flink版本...),然后从flink-project模板创建

    一个Flink项目。您需要sbt>=0.13.13才能执行此命令。如有必要,您可以按照此

    安装指南获取。

    bash<(curlhttps:flink.apache.orgqsbt-quickstart.sh)

    ThiswillcreateaFlinkprojectinthespecifiedprojectdirectory.

    构建项目

    要构建项目,您只需发出 sbtcleanassembly命令即可。这将在target

    scala_your-major-scala-version目录中创建fat-jaryour-project-name-

    assembly-0.1-SNAPSHOT.jar。

    运行项目

    Scala的项目模板

    56要运行项目,您必须发出 sbtrun命令。

    默认情况下,这将在运行的同一JVM中运行您的作业 sbt。要在不同的JVM中运

    行您的作业,请添加以下行 build.sbt

    forkinrun:=true

    的IntelliJ

    我们建议您使用IntelliJ进行Flink作业开发。要开始,您必须将新创建的项目导入

    IntelliJ。您可以通

    过 File->New->ProjectfromExistingSources...然后选择项目

    目录来执行此算子操作。然后,IntelliJ将自动检测 build.sbt文件并设置所有内

    容。

    为了运行Flink作业,建议选择 mainRunner模块作为运行调试配置的类路径。这

    将确保所有设置为提供的依赖项在执行时可用。您可以配置运行调试配置通

    过 Run->EditConfigurations...,然后选择 mainRunner从模块的使

    用类路径的Dropbox。

    Eclipse

    要将新创建的项目导入Eclipse,首先必须为其创建Eclipse项目文件。这些项目文

    件可以通过sbteclipse插件创建。将以下行添加到您

    的 PROJECT_DIRprojectplugins.sbt文件中:

    addSbtPlugin(com.typesafe.sbteclipse%sbteclipse-plugin%

    4.0.0)

    在 sbt使用以下命令创建Eclipse项目文件

    >eclipse

    现在,您可以通过项目将项目导入Eclipse

    File->Import...->ExistingProjectsintoWorkspace,然后

    选择项目目录。

    Maven

    要求

    唯一的要求是使用Maven3.0.4(或更高版本)和Java8.x安装。

    Scala的项目模板

    57创建项目

    使用以下命令之一创建项目:

    使用Maven原型

    运行快速入门脚本

    mvnarchetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeCatalog=https:repository.apache.orgcontentr

    epositoriessnapshots -DarchetypeVersion=1.7-SNAPSHOT

    这允许您命名新创建的项目。它将以交互方式询问您groupId,artifactId和包名称。

    curlhttps:flink.apache.orgqquickstart-scala-SNAPSHOT.sh

    |bash-s1.7-SNAPSHOT

    注意:对于Maven3.0或更高版本,不再可以通过命令行指定存储库(-

    DarchetypeCatalog)。如果要使用SNAPSHOT存储库,则需要向settings.xml添加

    存储库条目。有关此更改的详细信息,请参阅Maven官方文档

    检查项目

    您的工作目录中将有一个新目录。如果您使用了curl方法,则会调用该目

    录 quickstart。否则,它的名称为 artifactId:

    treequickstart

    quickstart

    ├──pom.xml

    └──src

    └──main

    ├──resources

    │└──log4j.properties

    └──scala

    └──org

    └──myorg

    └──quickstart

    ├──BatchJob.scala

    └──StreamingJob.scala

    示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob

    是DataStream和DataSet程序的基本框架程序。的主要方法是程序的入口点,无论

    是对在-IDE测试执行并作适当的部署。

    Scala的项目模板

    58我们建议您将此项目导入IDE。

    IntelliJIDEA支持Maven开箱即用,并为Scala开发提供插件。根据我们的经验,IntelliJ为开发Flink应用程序提供了最佳体验。

    对于Eclipse,您需要以下插件,您可以从提供的EclipseUpdateSites安装这些插

    件:

    Eclipse4.x

    ScalaIDE

    m2eclipse的-Scala

    构建HelperMaven插件

    Eclipse3.8

    用于Scala2.11的ScalaIDE或用于Scala2.10的ScalaIDE

    m2eclipse的-Scala

    构建HelperMaven插件

    构建项目

    如果要构建打包项目,请转到项目目录并运行' mvncleanpackage'命令。您将

    找到一个包含您的应用程序的JAR文件,以及您可能已作为依赖项添加到应用程序

    的连接器和库: target-.jar。

    注意:如果您使用与StreamingJob不同的类作为应用程序的主类入口点,我们建

    议您相应地更改文件中的 mainClass设置 pom.xml。这样,Flink可以从JAR文

    件运行时间应用程序,而无需另外指定主类。

    下一步

    写你的申请!

    如果您正在编写流处理应用程序并且正在寻找灵感来写什么,请查看流处理应用程

    序教程

    如果您正在编写批处理应用程序,并且正在寻找要编写的内容,请查看批处理应用

    程序示例

    有关API的完整概述,请查看DataStreamAPI和DataSetAPI部分。

    在这里,您可以了解如何在本地群集上的IDE外部运行应用程序。

    如果您有任何问题,请在我们的邮件列表中查询。我们很乐意提供帮助。

    Scala的项目模板

    59配置依赖关系,连接器,库

    译者:flink.sojb.cn

    每个Flink应用程序都依赖于一组Flink库。至少,应用程序依赖于FlinkAPI。许多应

    用程序还依赖于某些连接器库(如Kafka,Cassandra等)。运行Flink应用程序时

    (在分布式部署中或在IDE中进行测试),Flink运行时库也必须可用。

    Flink核心和应用程序依赖项

    与大多数运行用户定义应用程序的系统一样,Flink中有两大类依赖项和库:

    Flink核心依赖关系:Flink本身由运行系统所需的一组类和依赖项组成,例如

    协调,网络,检查点,故障转移,API,算子操作(如窗口),资源管理等。

    这些类和依赖项构成了Flink运行时的核心,在启动Flink应用程序时必须存在。

    这些核心类和依赖项打包在 flink-distjar中。它们是Flink lib文件夹的

    一部分,是Flink基本容器镜像的一部分。想象成类似于Java核心库(这些依赖

    关系 rt.jar, charsets.jar等等),其中包含像类 String和 List。

    Flink核心依赖项不包含任何连接器或库(CEP,SQL,ML等),以避免默认

    情况下在类路径中具有过多的依赖项和类。实际上,我们尝试尽可能保持核心

    依赖关系,以保持默认类路径较小并避免依赖性冲突。

    所述用户应用程序的依赖关系是所有的连接器,格式或库,一个特定的用户应

    用需求。

    用户应用程序通常打包到应用程序jar中,该应用程序jar包含应用程序代码以及

    所需的连接器和库依赖项。

    用户应用程序依赖项显式不包括FlinkDataSetDataStreamAPI和运行时依赖

    项,因为它们已经是Flink核心依赖项的一部分。

    设置项目:基本依赖项

    每个Flink应用程序都需要最低限度的API依赖关系来进行开发。对于Maven,您可

    以使用Java项目模板或Scala项目模板来创建具有这些初始依赖项的程序框架。

    手动设置项目时,需要为JavaScalaAPI添加以下依赖项(此处以Maven语法显

    示,但相同的依赖项也适用于其他构建工具(Gradle,SBT等)。

    Java

    Scala

    配置依赖关系,连接器,库

    60

    org.apache.flink

    flink-java

    1.7-SNAPSHOT

    provided

    

    org.apache.flink

    flink-streaming-java_2.11

    1.7-SNAPSHOT

    provided

    

    org.apache.flink

    flink-scala_2.11

    1.7-SNAPSHOT

    provided

    

    org.apache.flink

    flink-streaming-scala_2.11

    1.7-SNAPSHOT

    provided

    重要提示:请注意,所有这些依赖项都将其范围设置为提供。这意味着需要对它们

    进行编译,但不应将它们打包到项目生成的应用程序jar文件中-这些依赖项是Flink

    CoreDependencies,它们已在任何设置中提供。

    强烈建议将依赖关系保持在提供的范围内。如果它们未设置为提供,则最好的情况

    是生成的JAR变得过大,因为它还包含所有Flink核心依赖项。最糟糕的情况是添加

    到应用程序的jar文件的Flink核心依赖项与您自己的一些依赖版本冲突(通常通过反

    向类加载来避免)。

    关于IntelliJ的注意事项:要使应用程序在IntelliJIDEA中运行,需要在范围编译中

    声明Flink依赖项,而不是提供。否则,IntelliJ不会将它们添加到类路径中,并且in-

    IDE执行将失败并带有 NoClassDefFountError。为了避免必须将依赖范围声明

    为compile(不推荐使用,请参见上文),上面链接的Java和Scala项目模板使用了

    一个技巧:它们添加了一个配置文件,该应用程序在IntelliJ中运行时有选择地激活

    在不影响JAR文件打包的情况下,将依赖关系提升到范围编译。

    添加连接器和库依赖项

    大多数应用程序需要运行特定的连接器或库,例如连接到Kafka,Cassandra等的连

    接器。这些连接器不是Flink的核心依赖项的一部分,因此必须作为依赖项添加到应

    用程序中

    配置依赖关系,连接器,库

    61下面是将Kafka0.10的连接器添加为依赖项(Maven语法)的示例:

    

    org.apache.flink

    flink-connector-kafka-0.10_2.11

    1.7-SNAPSHOT

    我们建议将应用程序代码及其所有必需的依赖项打包到一个jar-with-dependencies

    中,我们将其称为应用程序jar。应用程序jar可以提交给已经运行的Flink集群,也可

    以添加到Flink应用程序容器镜像中。

    从Java项目模板或Scala项目模板创建的项目配置为在运行时自动将应用程序依赖

    项包含到应用程序jar中 mvncleanpackage。对于未从这些模板设置的项目,我

    们建议添加MavenShade插件(如下面的附录中所列)以构建具有所有必需依赖项

    的应用程序jar。

    重要:对于Maven(和其他构建工具)将依赖项正确打包到应用程序jar中,必须在

    范围编译中指定这些应用程序依赖项(与核心依赖项不同,核心依赖项必须在提供

    的作用域中指定)。

    Scala版本

    Scala版本(2.10,2.11,2.12等)彼此不是二进制兼容的。因此,FalaforScala2.11

    不能与使用Scala2.12的应用程序一起使用。

    例如,所有(传递上)依赖于Scala的Flink依赖项都以它们为其构建的Scala版本为

    后缀 flink-streaming-scala_2.11。

    只使用Java开发人员可以选择任何Scala版本,Scala开发人员需要选择与其应用程

    序的Scala版本匹配的Scala版本。

    有关如何为特定Scala版本构建Flink的详细信息,请参阅构建指南。

    注意:由于Scala2.12中的重大更改,Flink1.5目前仅针对Scala2.11构建。我们的

    目标是在下一版本中添加对Scala2.12的支持。

    Hadoop依赖项

    一般规则:永远不必将Hadoop依赖项直接添加到您的应用程序中。(唯一的例外

    是当使用现有的Hadoop输入输出格式与Flink的Hadoop兼容打包器时)

    如果要将Flink与Hadoop一起使用,则需要具有包含Hadoop依赖关系的Flink设置,而不是将Hadoop添加为应用程序依赖关系。有关详细信息,请参阅Hadoop设置指

    南。

    该设计有两个主要原因:

    配置依赖关系,连接器,库

    62一些Hadoop交互发生在Flink的核心,可能在用户应用程序启动之前,例如为

    检查点设置HDFS,通过Hadoop的Kerberos令牌进行身份验证或在YARN上部

    署。

    Flink的反向类加载方法隐藏了核心依赖关系中的许多传递依赖关系。这不仅适

    用于Flink自己的核心依赖项,也适用于Hadoop在设置中存在的依赖项。这

    样,应用程序可以使用相同依赖项的不同版本,而不会遇到依赖冲突(并且相

    信我们,这是一个大问题,因为Hadoops依赖树很大。)

    如果在IDE内部的测试或开发过程中需要Hadoop依赖关系(例如,用于HDFS访

    问),请配置这些依赖关系,类似于要测试或提供的依赖关系的范围。

    附录:用于构建具有依赖关系的Jar的模板

    要构建包含声明的连接器和库所需的所有依赖项的应用程序JAR,可以使用以下

    shade插件定义:

    配置依赖关系,连接器,库

    63

    

    

    org.apache.maven.plugins

    maven-shade-plugin

    3.0.0

    

    

    package

    

    shade

    

    

    

    com.google.code.findbug

    s:jsr305

    org.slf4j:

    log4j:

    

    

    

    :

    

    META-INF.SF
    ude>

    META-INF.DSA
    lude>

    META-INF.RSA
    lude>
    che.maven.plugins.shade.resource.ManifestResourceTransformer>

    my.programs.main.claz

    z

    配置依赖关系,连接器,库

    64配置依赖关系,连接器,库

    65基本API概念

    译者:flink.sojb.cn

    Flink程序是实现分布式集合转换的常规程序(例如,Filter,映射,更新状态,Join,分组,定义窗口,聚合)。集合最初是从源创建的(例如,通过读取文件,kafka主题或从本地的内存中集合)。结果通过接收器返回,接收器可以例如将数据

    写入(分布式)文件或标准输出(例如,命令行终端)。Flink程序可以在各种环境

    中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多

    计算机的集群上执行。

    根据数据源的类型(即有界或无界源),您可以编写批处理程序或流程序,其中

    DataSetAPI用于批处理,DataStreamAPI用于流式处理。本指南将介绍两种API共

    有的基本概念,但请参阅我们的流处理指南和批处理指南,了解有关使用每个API

    编写程序的具体信息。

    注:当显示的API时,如何使用,我们将用实际的例子

    StreamingExecutionEnvironment和 DataStreamAPI。 DataSetAPI中的概

    念完全相同,只需替换为 ExecutionEnvironment和 DataSet。

    DataSet和DataStream

    Flink具有特殊类 DataSet并 DataStream在程序中表示数据。您可以将它们视为

    可以包含重复项的不可变数据集合。在 DataSet数据有限的情况下,对于一

    个 DataStream数据元的数量可以是无界的。

    这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着

    一旦创建它们就无法添加或删除数据元。你也不能简单地检查里面的数据元。

    集合最初通过在Flink程序添加源创建和新的集合从这些通过将它们使用API方法如

    衍生 map, filter等等。

    Flink计划的剖析

    Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

    1. 获得一个 executionenvironment,2. 加载创建初始数据,3. 指定此数据的转换,4. 指定放置计算结果的位置,5. 触发程序执行

    6. Java

    7. Scala

    基础API概念

    66我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java

    DataSetAPI的所有核心类都可以在org.apache.flink.api.java包中找到,而Java

    DataStreamAPI的类可以在org.apache.flink.streaming.api中找到。

    这 StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态

    方法获取一个 StreamExecutionEnvironment:

    getExecutionEnvironment

    createLocalEnvironment

    createRemoteEnvironment(Stringhost,intport,String...jarFile

    s)

    通常,您只需要使用 getExecutionEnvironment,因为这将根据上下文做正

    确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环

    境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通

    过命令行调用它,则Flink集群管理器将执行您的main方法

    并 getExecutionEnvironment返回一个运行环境,以便在集群上执行您的程

    序。

    对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐

    行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列

    行读取,您可以使用:

    finalStreamExecutionEnvironmentenv=StreamExecutionEnvironmen

    t.getExecutionEnvironment;

    DataStreamtext=env.readTextFile(file:pathtofile

    );

    这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生

    DataStream。

    您可以通过使用转换函数调用DataStream上的方法来应用转换。例如,Map转换如

    下所示:

    DataStreaminput=...;

    DataStreamparsed=input.map(newMapFunction
    nteger>{

    @Override

    publicIntegermap(Stringvalue){

    returnInteger.parseInt(value);

    }

    });

    基础API概念

    67这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。

    一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。

    这些只是创建接收器的一些示例方法:

    writeAsText(Stringpath)

    print

    我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Scala

    DataSetAPI的所有核心类都可以在org.apache.flink.api.scala包中找到,而Scala

    DataStreamAPI的类可以在org.apache.flink.streaming.api.scala中找到。

    这 StreamExecutionEnvironment是所有Flink计划的基础。您可以使用以下静态

    方法获取一个 StreamExecutionEnvironment:

    getExecutionEnvironment

    createLocalEnvironment

    createRemoteEnvironment(host:String,port:Int,jarFiles:Strin

    g)

    通常,您只需要使用 getExecutionEnvironment,因为这将根据上下文做正

    确的事情:如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环

    境,将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通

    过命令行调用它,则Flink集群管理器将执行您的main方法

    并 getExecutionEnvironment返回一个运行环境,以便在集群上执行您的程

    序。

    对于指定数据源,运行环境有几种方法可以使用各种方法从文件中读取:您可以逐

    行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列

    行读取,您可以使用:

    valenv=StreamExecutionEnvironment.getExecutionEnvironment

    valtext:DataStream[String]=env.readTextFile(file:pathto

    file)

    这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生

    DataStream。

    您可以通过使用转换函数调用DataSet上的方法来应用转换。例如,Map转换如下

    所示:

    基础API概念

    68valinput:DataSet[String]=...

    valmapped=input.map{x=>x.toInt}

    这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。

    一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。

    这些只是创建接收器的一些示例方法:

    writeAsText(path:String)

    print

    一旦您指定的完整程序,你需要触发执行程序调用

    execute上 StreamExecutionEnvironment。根据执行的类

    型, ExecutionEnvironment将在本地计算机上触发执行或提交程序以在群集上

    执行。

    该 execute方法返回一个 JobExecutionResult,包含执行时间和累加器结

    果。

    有关流数据源和接收器的信息,请参阅流指南,以及有关DataStream上支持的转换

    的更深入信息。

    有关批处理数据源和接收器的信息,请查看批处理指南,以及有关DataSet支持的

    转换的更深入信息。

    懒惰的评价

    所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直

    接发生。而是创建每个算子操作并将其添加到程序的计划中。当 execute运行

    环境上的调用显式触发执行时,实际执行算子操作。程序是在本地执行还是在集群

    上执行取决于运行环境的类型

    懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。

    指定Keys

    某些转换(join,coGroup,keyBy,groupBy)要求在数据元集合上定义键。其他

    转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在

    Keys上分组。

    DataSet被分组为

    基础API概念

    69DataSet<...>input=[...]

    DataSet<...>reduced=input

    .groupBy(definekeyhere)

    .reduceGroup(dosomething);

    虽然可以使用DataStream指定Keys

    DataStream<...>input=[...]

    DataStream<...>windowed=input

    .keyBy(definekeyhere)

    .window(windowspecification);

    Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。

    键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。

    注意:在下面的讨论中,我们将使用 DataStreamAPI和 keyBy。对于DataSet

    API,您只需要替换为 DataSet和 groupBy。

    定义元组的键

    最简单的情况是在元组的一个或多个字段上对元组进行分组:

    Java

    Scala

    DataStream>input=[...]

    KeyedStream,Tuple>keyed=input.key

    By(0)

    valinput:DataStream[(Int,String,Long)]=[...]valkeyed

    =input.keyBy(0)

    元组在第一个字段(整数类型)上分组。

    Java

    Scala

    DataStream>input=[...]

    KeyedStream,Tuple>keyed=input.key

    By(0,1)

    基础API概念

    70valinput:DataSet[(Int,String,Long)]=[...]valgrouped=

    input.groupBy(0,1)

    在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。

    关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:

    DataStream,String,Long>>ds;

    指定 keyBy(0)将导致系统使用full Tuple2作为键(以Integer和Float为键)。

    如果要“导航”到嵌套中 Tuple2,则必须使用下面解释的字段表达式键。

    使用FieldExpressions定义键

    您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连

    接或coGrouping的键。

    字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类

    型。

    Java

    Scala

    在下面的示例中,我们有一个 WCPOJO,其中包含两个字段“word”和“count”。要

    按字段分组 word,我们只需将其名称传递给 keyBy函数即可。

    someordinaryPOJO(PlainoldJavaObject)

    publicclassWC{

    publicStringword;

    publicintcount;

    }

    DataStreamwords=[...]

    DataStreamwordCounts=words.keyBy(word).window(window

    specification);

    字段表达式语法:

    按字段名称选择POJO字段。例如, user指POJO类型的“用户”字段。

    按字段名称或0偏移字段索引选择元组字段。例如 f0,分别 5引用

    JavaTuple类型的第一个和第六个字段。

    您可以在POJO和Tuples中选择嵌套字段。例如, user.zip指POJO

    的“zip”字段,其存储在POJO类型的“user”字段中。支持POJO和元组的任意嵌

    套和混合,例如 f1.user.zip或 user.f3.1.zip。

    基础API概念

    71您可以使用 通配符表达式选择完整类型。这也适用于非Tuple或POJO类

    型的类型。

    字段表达示例:

    publicstaticclassWC{

    publicComplexNestedClasscomplex;nestedPOJO

    privateintcount;

    gettersetterforprivatefield(count)

    publicintgetCount{

    returncount;

    }

    publicvoidsetCount(intc){

    this.count=c;

    }

    }

    publicstaticclassComplexNestedClass{

    publicIntegersomeNumber;

    publicfloatsomeFloat;

    publicTuple3word;

    publicIntWritablehadoopCitizen;

    }

    这些是上面示例代码的有效字段表达式:

    count:类中的count字段 WC。

    complex:递归选择POJO类型的字段复合体的所有字

    段 ComplexNestedClass。

    complex.word.f2:选择嵌套的最后一个字段 Tuple3。

    complex.hadoopCitizen:选择Hadoop IntWritable类型。

    在下面的示例中,我们有一个 WCPOJO,其中包含两个字段“word”和“count”。要

    按字段分组 word,我们只需将其名称传递给 keyBy函数即可。

    someordinaryPOJO(PlainoldJavaObject)

    classWC(varword:String,varcount:Int){

    defthis{this(,0L)}

    }

    valwords:DataStream[WC]=[...]

    valwordCounts=words.keyBy(word).window(windowspecificati

    on)

    or,asacaseclass,whichislesstyping

    caseclassWC(word:String,count:Int)

    valwords:DataStream[WC]=[...]

    valwordCounts=words.keyBy(word).window(windowspecificati

    on)

    基础API概念

    72字段表达式语法:

    按字段名称选择POJO字段。例如, user指POJO类型的“用户”字段。

    通过1偏移字段名称或0偏移字段索引选择元组字段。例如 _1,分

    别 5引用ScalaTuple类型的第一个和第六个字段。

    您可以在POJO和Tuples中选择嵌套字段。例如, user.zip指POJO

    的“zip”字段,其存储在POJO类型的“user”字段中。支持POJO和元组的任意嵌

    套和混合,例如 _2.user.zip或 user._4.1.zip。

    您可以使用 _通配符表达式选择完整类型。这也适用于非Tuple或POJO类

    型的类型。

    字段表达示例:

    classWC(varcomplex:ComplexNestedClass,varcount:Int){

    defthis{this(null,0)}

    }

    classComplexNestedClass(

    varsomeNumber:Int,someFloat:Float,word:(Long,Long,String),hadoopCitizen:IntWritable){

    defthis{this(0,0,(0,0,),newIntWritable(0))}

    }

    这些是上面示例代码的有效字段表达式:

    count:类中的count字段 WC。

    complex:递归选择POJO类型的字段复合体的所有字

    段 ComplexNestedClass。

    complex.word._3:选择嵌套的最后一个字段 Tuple3。

    complex.hadoopCitizen:选择Hadoop IntWritable类型。

    使用键选择器函数定义键

    定义键的另一种方法是“键选择器”函数。键选择器函数将单个数据元作为输入并返

    回数据元的键。Keys可以是任何类型,并且可以从确定性计算中导出。

    以下示例显示了一个键选择器函数,它只返回一个对象的字段:

    Java

    Scala

    基础API概念

    73someordinaryPOJO

    publicclassWC{publicStringword;publicintcount;}

    DataStreamwords=[...]

    KeyedStreamkeyed=words

    .keyBy(newKeySelector{

    publicStringgetKey(WCwc){returnwc.word;}

    });

    someordinarycaseclasscaseclassWC(word:String,count:I

    nt)

    valwords:DataStream[WC]=[...]valkeyed=words.keyBy(_.

    word)

    指定转换函数

    大多数转换都需要用户定义的函数。本节列出了如何指定它们的不同方法

    Java

    Scala

    实现接口

    最基本的方法是实现一个提供的接口:

    classMyMapFunctionimplementsMapFunction{

    publicIntegermap(Stringvalue){returnInteger.parseInt(val

    ue);}

    };

    data.map(newMyMapFunction);

    匿名课程

    您可以将函数作为匿名类传递:

    data.map(newMapFunction{

    publicIntegermap(Stringvalue){returnInteger.parseInt(val

    ue);}

    });

    Java8Lambdas

    Flink还支持JavaAPI中的Java8Lambdas。

    基础API概念

    74data.filter(s->s.startsWith(http:));

    data.reduce((i1,i2)->i1+i2);

    函数丰富

    需要用户定义函数的所有转换都可以将富函数作为参数。例如,而不是

    classMyMapFunctionimplementsMapFunction{

    publicIntegermap(Stringvalue){returnInteger.parseInt(val

    ue);}

    };

    你可以写

    classMyMapFunctionextendsRichMapFunction{

    publicIntegermap(Stringvalue){returnInteger.parseInt(val

    ue);}

    };

    并像往常一样将函数传递给 map转换:

    data.map(newMyMapFunction);

    丰富的函数也可以定义为匿名类:

    data.map(newRichMapFunction{

    publicIntegermap(Stringvalue){returnInteger.parseInt(val

    ue);}

    });

    Lambda函数

    正如前面的例子中所见,所有算子操作都接受lambda函数来描述算子操作:

    valdata:DataSet[String]=[...]data.filter{_.startsWith(

    http:)}

    基础API概念

    75valdata:DataSet[Int]=[...]data.reduce{(i1,i2)=>i1+

    i2}

    ordata.reduce{_+_}

    函数丰富

    将lambda函数作为参数的所有转换都可以将富函数作为参数。例如,而不是

    data.map{x=>x.toInt}

    你可以写

    classMyMapFunctionextendsRichMapFunction[String,Int]{

    defmap(in:String):Int={in.toInt}

    };

    并将函数传递给 map转换:

    data.map(newMyMapFunction)

    丰富的函数也可以定义为匿名类:

    data.map(newRichMapFunction[String,Int]{

    defmap(in:String):Int={in.toInt}

    })

    丰富的函数提供,除了用户定义的函数(Map,Reduce等),四种方

    法: open, close, getRuntimeContext,和 setRuntimeContext。这

    些用于参数化函数(请参阅将参数传递给函数),创建和完成本地状态,访问广播

    变量(请参阅广播变量)以及访问运行时信息(如累加器和计数器)(请参阅累

    加器和计数器)以及有关信息的信息。迭代(参见迭代)。

    支持的数据类型

    Flink对可以在DataSet或DataStream中的数据元类型进行了一些限制。原因是系统

    分析类型以确定有效的执行策略。

    有六种不同类别的数据类型:

    1. Java元组和Scala案例类

    2. JavaPOJO

    3. 原始类型

    基础API概念

    764. 常规课程

    5. 值

    6. HadoopWritables

    7. 特殊类型

    元组和案例类

    Java

    Scala

    元组是包含固定数量的具有各种类型的字段的复合类型。JavaAPI提供 Tuple1最

    多的类 Tuple25。元组的每个字段都可以是包含更多元组的任意Flink类型,从而

    产生嵌套元组。可以使用字段名称直接访问元组的字段 tuple.f4,或使用通用

    getter方法 tuple.getField(intposition)。字段索引从0开始。请注意,这与

    Scala元组形成鲜明对比,但它与Java常规索引更为一致。

    DataStream>wordCounts=env.fromElement

    s(

    newTuple2(hello,1),newTuple2(world,2));

    wordCounts.map(newMapFunction,Integer>

    {

    @Override

    publicIntegermap(Tuple2value)throwsExc

    eption{

    returnvalue.f1;

    }

    });

    wordCounts.keyBy(0);alsovalid.keyBy(f0)

    Scala案例类(和Scala元组是案例类的特例)是包含固定数量的具有各种类型的字

    段的复合类型。元组字段通过其1偏移名称来寻址,例如 _1第一个字段。案例类

    字段按名称访问。

    caseclassWordCount(word:String,count:Int)

    valinput=env.fromElements(

    WordCount(hello,1),WordCount(world,2))CaseClassDataSet

    input.keyBy(word)keybyfieldexpressionword

    valinput2=env.fromElements((hello,1),(world,2))Tup

    le2DataSet

    input2.keyBy(0,1)keybyfieldpositions0and1

    POJOs

    基础API概念

    77如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:

    这堂课必须公开。

    它必须有一个没有参数的公共构造函数(默认构造函数)。

    所有字段都是公共的,或者必须通过getter和setter函数访问。对于一个名

    为 foogetter和setter方法的字段必须命名 getFoo和 setFoo。

    Flink必须支持字段的类型。目前,Flink使用Avro序列化任意对象(例

    如 Date)。

    Flink分析了POJO类型的结构,即它了解了POJO的字段。因此,POJO类型比一般

    类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。

    以下示例显示了一个包含两个公共字段的简单POJO。

    Java

    Scala

    publicclassWordWithCount{

    publicStringword;

    publicintcount;

    publicWordWithCount{}

    publicWordWithCount(Stringword,intcount){

    this.word=word;

    this.count=count;

    }

    }

    DataStreamwordCounts=env.fromElements(

    newWordWithCount(hello,1),newWordWithCount(world,2));

    wordCounts.keyBy(word);keybyfieldexpressionword

    classWordWithCount(varword:String,varcount:Int){

    defthis{

    this(null,-1)

    }

    }

    valinput=env.fromElements(

    newWordWithCount(hello,1),newWordWithCount(world,2))CaseClassDataSet

    input.keyBy(word)keybyfieldexpressionword

    基础API概念

    78原始类型

    Flink支持所有Java和Scala的原始类型,如 Integer, String和 Double。

    一般类别

    Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含无法序列化的字

    段的类,如文件指针,IO流或其他本机资源。遵循JavaBeans约定的类通常可以

    很好地工作。

    所有未标识为POJO类型的类(请参阅上面的POJO要求)都由Flink作为常规类类

    型处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排

    序)。使用序列化框架Kryo对常规类型进行反序列化。

    值

    值类型手动描述它们的序列化和反序列化。它们不是通过通用序列化框架,而是通

    过 org.apache.flinktypes.Value使用方法 read和实现接口为这些算子操作

    提供自定义代码 write。当通用序列化效率非常低时,使用值类型是合理的。一

    个示例是将数据元的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以

    对非零数据元使用特殊编码,而通用序列化只需编写所有数组数据元。

    该 org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克

    隆逻辑。

    Flink带有与基本数据类型对应的预定义值类型。( ByteValue,ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue

    , CharValue, BooleanValue)。这些Value类型充当基本数据类型的可变变

    体:它们的值可以被更改,允许程序员重用 ......

您现在查看是摘要介绍页, 详见PDF附件(11499KB,1169页)