Flink WordCount程序背后的万字深度解析

Flink WordCount程序背后的万字深度解析

1 Flink数据流图简介

1.1 Flink样例程序

我们开始对数据流做处理,计算数据流中单词出现的频次。从这个样例中,我们可以一窥Flink设计和运行原理。

图 1 Flink样例程序示意图

程序分为三大部分:

  • 第一部分读取数据源(Source),

  • 第二部分对数据做转换操作(Transformation),

  • 最后将转换结果输出到一个目的地(Sink)。

    代码中的函数被称为算子(Operator),是Flink提供给程序员的接口,程序员需要通过这些算子对数据做操作。

算子

我们可以把算子理解为1 + 2 运算中的加号,加号(+)是这个算子的一个符号表示,它表示对数字1和数字2做加法运算。

同样,在Flink或Spark这样的大数据引擎中,算子对数据进行某种操作,程序员可以根据自己的需求调用合适的算子,完成所需计算任务。

常用的算子有map、flatMap、keyBy、timeWindow等,它们分别对数据流执行不同类型的操作。

图 2 WordCont程序的逻辑视图

图 2展示了WordCount程序中数据从不同算子间流动的情况。

图中,圆圈代表算子,圆圈间的箭头代表数据流,数据流在Flink程序中经过不同算子的计算,最终生成为目标数据。

按照算子对数据的操作内容,一般将算子分为Source算子、Transformation算子和Sink算子。

Source算子读取数据源中的数据,数据源可以是数据流、也可以存储在文件系统中的文件。

Transformation算子对数据进行必要的计算处理。

Sink算子将处理结果输出,数据一般被输出到数据库、文件系统或下一个数据流程序。

我们先对这个样例程序中各个算子做一个简单的介绍:

  • map

map函数对数据流中每一条数据做一个操作,生成一条新的数据。本例中map(word => (word, 1))表示取输入的每个单词,用变量word表示,然后生成一个二元对(word, 1),1是表示出现了一次。注意,map的一条输入数据对应一条输出数据。

  • flatMap

在解释flatMap前,我们先对split函数做一个简单介绍。split(“\\s”)函数以空白字符为分隔符,将文本切分成单词列表。如果输入为“Hello Flink“,那么经过这个函数切分后,得到结果为[“Hello”,”Flink”]组成的单词列表。

本例中flatMap(line => line.split(“\\s”))表示取出输入的每一行文本,用变量line表示,将文本中以空格做切分,生成一个单词列表,到这里仍然列表,flatMap接着对列表打平,输出单个单词。flatMap先做map所做的操作,然后对输出的各个列表打平,因此,flatMap的一条输入数据可能有多条输出。

  • keyBy

keyBy根据某个Key做数据重分布,将所有数据中包含该Key的数据都发送到同一个分区上。本例中是将二元组中第一项作为Key,即以单词为Key,包含同样单词的二元对都发送到同一分区上。

  • timeWindow

timeWindow是时间窗口函数,以界定对多长时间之内的数据做统计。

  • sum

sum为求和函数。sum(1)表示对二元组中第二个元素求和,因为经过前面的keyBy,所有单词都被发送到了同一个分区,因此,在这一个分区上,将单词出现次数做加和,就得到出现的总次数。

对于词频统计这个案例,逻辑上来讲无非是对数据流中的单词做提取,然后使用一个Key-Value结构对单词做词频计数,最后输出结果即可,这样的逻辑本可以用几行代码完成,改成这样的算子形式,反而让新人看着一头雾水,为什么一定要用算子的形式来写程序呢?实际上,算子进化成当前这个形态,就像人类从石块计数,到手指计数,到算盘计数,再到计算器计数这样的进化过程一样,尽管更低级的方式可以完成一定的计算任务,但是随着计算规模的增长,古老的计数方式存在着低效的弊端,无法完成更高级别和更大规模的计算需求。试想,如果我们不使用大数据引擎提供的算子,而是自己实现一套上述的计算逻辑,尽管我们可以快速完成当前的词频统计的任务,但是当面临一个新计算任务时,我们需要再次重新编写程序,完成一整套计算任务。我们自己编写代码的横向扩展性可能很低,当输入数据暴增时,我们需要做很大改动,以部署在更多机器上。

大数据引擎的算子对计算做了一些抽象,对于新人来说有一定学习成本,而一旦掌握这门技术,人们所能处理的数据规模将成倍增加。大数据引擎的算子出现,正是针对数据分布在多个分区的大数据场景需要一种统一的计算描述语言来对数据做计算而进化出的新计算形态。基于大数据引擎的算子,我们可以定义一个数据流的逻辑视图,以此完成对大数据的计算。剩下那些数据交换、横向扩展、故障恢复等问题全交由大数据引擎来解决。

1.2 从逻辑视图到物理执行

在绝大多数的大数据处理场景下,一台机器节点无法处理所有数据,数据被切分到多台节点上。

在大数据领域,当数据量大到超过单台机器处理能力时,就将一份数据切分到多个分区(Partition)上,每个分区分布在一台虚拟机或物理机上。

前一小节已经提到,大数据引擎的算子提供了编程接口,使用算子我们可以构建数据流的逻辑视图。

考虑到数据分布在多个节点的情况,逻辑视图只是一种抽象,需要将逻辑视图转化为物理执行图,才能在分布式环境下执行。

图 3 样例程序物理执行示意图

图 3为1.1中的样例程序的物理执行图,这里数据流分布在2个分区上。箭头部分表示数据流分区,圆圈部分表示算子在分区上的算子子任务(Operator Subtask)。

从逻辑视图变为物理执行图后,

map算子在每个分区都有一个算子子任务,以处理该分区上的数据:map[1/2]算子子任务处理第一个数据流分区上的数据,map[2/2]算子子任务处理第二个数据流分区上的数据。

keyBy算子会将数据按照某个key做数据重分布,在词频统计的例子中是以单词为key,例如,输入数据为“Hello Flink Hello World”,keyBy算子会将所有的”Hello”归结到一个分区上。

算子子任务是物理执行的基本单元,算子子任务之间是相互独立的,某个算子子任务有自己的线程,不同算子子任务可能分布在不同的节点上。后文在Flink的资源分配部分我们还会重点介绍算子子任务。

从图 3中可以看到,除去Sink外的算子都被分成了2个算子子任务,这样配置的并行度(Parallelism)为2,Sink算子的并行度为1。并行度是可以被设置的,实际应用中一般根据数据量的大小,计算资源的多少等多方面的因素来设置并行度。

1.3 数据交换策略

图 3中keyBy算子子任务将数据做了重新分配,即数据在不同分区上进行着数据交换,产生了数据流动的现象。无论是Hadoop、Spark还是Flink,当涉及数据分布在多个分区时,对数据的处理都会涉及到数据交换策略。在Flink中,数据交换策略包括图 4中涉及到的四种策略:

图 4 Flink数据交换策略

  1. 前向传播(Forward):前一个算子子任务将数据直接传递给后一个算子子任务,数据不存在跨分区的交换,也避免了因数据交换产生的各类开销,图 3中Source和和flatMap之间就是这样的情形。
  2. 全局广播(Broadcast):将某份数据发送到所有分区上,这种策略涉及到了数据拷贝和网络通信,因此非常消耗资源。
  3. 基于Key的数据重分布:数据以(Key, Value)形式存在,该策略将所有数据做一次重新分布,并保证相同Key的数据被发送到同一个分区上。图 3中keyBy算子将单词作为Key,把某个单词都发送到同一分区,以方便后续算子来统计这个单词出现的频次。
  4. 随机策略(Random):该策略将所有数据随机均匀地发送到多个分区上,以保证数据平均分配到不同分区上。该策略通常为了防止数据倾斜到某些分区,导致部分分区数据稀疏,部分分区数据拥堵,甚至超过该分区上算子的处理能力。

2 Flink架构与核心组件

为了实现支持分布式运行,Flink跟其他大数据引擎一样,采用了主从(Master-Worker)架构,运行时主要包括两个组件:

• JobManager,又被称为Master,是一个Flink应用的主节点。

• TaskManager,又被称为Worker,执行计算任务的节点。

一个Flink应用一般含有至少一个JobManager,一个或多个TaskManager。

2.1 Flink作业执行过程

图 5 Flink作业提交流程

用户编写Flink程序并提交任务的具体流程为:

  1. 用户编写应用程序代码,并通过Flink客户端(Client)提交作业。程序一般为Java或Scala语言,调用Flink API,构建基于逻辑视角的数据流图,代码和相关配置文件被编译打包,并被提交到JobManager上,形成一个应用作业(Application)。
  2. JobManager接受到作业后,将逻辑视图转化成可并行的物理执行图。
  3. JobManager将物理执行图发送给各TaskManager。
  4. TaskManager接收到物理执行图后,会初始化并开始执行被分配的任务。
  5. TaskManager在执行任务过程中可能会与其他TaskManager交换数据,会使用图 4提到的一些数据交换策略。
  6. TaskManager将任务启动、运行、性能指标、结束或终止等状态信息会反馈给JobManager。
  7. 用户可以使用Flink Web仪表盘来监控提交的作业。

图 6 Flink主从架构架构图

图 6对Flink的各个组件描述得更为详细,我们再对涉及到的各个组件进行更为详细的介绍。

Client

当用户提交一个Flink程序时,会首先创建一个客户端(Client)。该Client会对用户提交的Flink程序进行预处理,并把作业提交到Flink集群中处理。Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink作业提交给JobManager。Client会将用户提交的Flink程序组装一个JobGraph。

JobManager

JobManager是Flink的协调者,它负责接收Flink作业,调度任务。同时,JobManager还负责管理TaskManager,收集作业的状态信息,生成检查点和故障恢复等问题。JobManager会将Client提交的JobGraph转化为ExceutionGraph,ExecutionGraph是JobGraph的并行版本,但还不是最终的物理执行图。

TaskManager

TaskManager是实际负责执行计算的节点,在其上执行物理执行图。同时,TaskManager还要处理必要的数据缓存和交换。每个TaskManager负责管理其所在节点上的资源信息,包括内存、磁盘、网络,TaskManager启动的时候会将资源的状态向JobManager汇报。

2.2 逻辑视图到物理执行图

逻辑视图转化为物理执行图过程,该过程可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:是根据用户通过 DataStream API 编写的代码生成的最初的图,用来表示程序的拓扑结构。在这张图中,节点就是用户调用的算子,边表示数据流。
  • JobGraph:JobGraph是提交给 JobManager 的数据结构。StreamGraph经过优化后生成了 JobGraph,主要的优化为,将多个符合条件的节点链接在一起作为一个节点,这样可以减少数据交换所需要的序列化、反序列化以及传输消耗。这个链接的过程叫做算子链,会在下一节简单介绍。
  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对作业进行调度后,在各个TaskManager 上部署任务形成的图,物理执行并不是一个具体的数据结构。

可以看到,Flink在数据流图上可谓煞费苦心,仅各类图就有四种之多。对于新人来说,可以不用太关心这些非常细节的底层实现,只需要了解以下几个核心概念:

  • Flink采用主从架构,JobManager起着管理协调作用,TaskManager负责物理执行,在执行过程中会发生一些数据交换、生命周期管理等事情。
  • 用户调用Flink API,构造逻辑视图,Flink会对逻辑视图优化,并转化为物理执行图,最后被执行的是物理执行图。

2.3 任务、算子子任务与算子链

在分布式运行的过程中,Flink将一些算子子任务(Subtask)链接在一起,组成算子链(Operator Chain),链接后将以任务(Task)的形式被TaskManager调度执行。使用算子链是一个非常有效的优化,它可以有效降低算子子任务之间的传输开销。链接之后形成的Task是TaskManager中的一个线程。

图 7 任务、子任务与算子链

例如,数据从Source算子前向传播到 flatMap算子,再由flatMap算子前向传播到map算子,中间没有发生跨分区的数据交换,因此,我们完全可以将Source、flatMap和map几个Operator Subtask组合在一起,形成一个Task。keyBy算子发生了数据重分布,数据会跨越分区,因此map和keyBy无法被链接到一起。同样,我们也不能把sum和Sink链接到一起。

默认情况下,Flink会尽量将更多的Subtask链接在一起,但一个Subtask有超过一个输入或发生数据交换时,链接就无法建立。尽管将算子链接到一起会降低一些传输开销,但是也有一些情况并不需要太多链接。比如,有时候我们需要将一个非常长的算子链拆开,这样我们就可以将原来集中在一个线程中的计算拆分到多个线程中来并行计算。Flink手动配置是否对某些算子启用算子链。

2.4 任务槽位与计算资源

任务槽位的概念

根据前文的介绍,我们已经了解到TaskManager负责具体的任务执行。TaskManager是一个Java虚拟机进程,在TaskManager中可以并行运行多个Task。在程序执行之前,经过优化,部分Subtask被链接在一起,组成一个Task。每个Task是一个线程,需要TaskManager为其分配相应的资源,TaskManager使用任务槽位(Task Slot)给任务分配资源,简称槽位(Slot)。

在解释任务槽位的概念前,我们先回顾一下进程与线程的概念。在操作系统层面,进程(Process)是进行资源分配和调度的一个独立单位,线程(Thread)是是CPU调度的基本单位。比如,我们常用的Office Word软件,在启动后就占用操作系统的一个进程。Windows上可以使用任务管理器来查看当前活跃进程,Linux上可以使用top命令来查看。线程是进程的一个子集,一个线程一般专注于处理一些特定任务,不独立拥有系统资源,只拥有一些运行中必要的资源,如程序计数器。一个进程至少有一个线程,也可以有多个线程。多线程场景下,每个线程都处理一小个任务,多个线程以高并发的方式同时处理多个小任务,可以提高处理能力。

回到Flink的槽位分配机制上,一个TaskManager是一个进程,TaskManager可以管理一至多个Task,每个Task是一个线程,占用一个槽位。

图 8 Task Slot与Task Manager

假设我们给WordCount程序分配两个TaskManager,每个TaskManager又分配3个槽位,所以总共是6个槽位。结合图 7中对这个作业的并行度设置,整个作业被划分为5个Task,使用5个线程,这5个线程可以按照图 8所示的方式分配到6个槽位中。

每个槽位的资源是整个TaskManager资源的子集,比如这里的TaskManager下有3个槽位,每个槽位占用TaskManager所管理的1/3的内存,在第一个槽位内运行的任务不会与在第二个槽位内运行的任务互相争抢内存资源。注意,在分配资源时,Flink并没有将CPU资源明确分配给各个槽位。

Flink允许用户设置TaskManager中槽位的数目,这样用户就可以确定以怎样的粒度将任务做相互隔离。如果每个TaskManager只包含一个槽位,那么运行在该槽位内的任务将独享JVM。如果TaskManager包含多个槽位,那么多个槽位内的任务可以共享JVM资源,比如共享TCP连接、心跳信息、部分数据结构等。如无特殊需要,可以将槽位数目设置为TaskManager下可用的CPU核心数,那么平均下来,每个槽位都能获得至少一个CPU核心。

槽位共享

图 8中展示了任务的一种资源分配方式,默认情况下, Flink还提供了一种槽位共享(Slot Sharing)的优化机制,进一步优化数据传输开销,充分利用计算资源。将图 8中的任务做槽位共享优化后,结果如图 9所示。

图 9 槽位共享示意图

开启槽位共享后,Flink允许将独占一个槽位的任务与同一个作业中的其他任务共享槽位。于是可以将一个作业从开头到结尾的所有Subtask都放置在一个槽位中,如图 9中最左侧的数据流,这样槽位内的数据交换成本更低。而且,对于一个数据流图来说,Source、map等算子的计算量相对不大,window算子的计算量比较大,计算量较大的Subtask与计算量较小的Subtask相互互补,可以腾出更多的槽位,分配给更多Task,这样可以更好地利用资源。而不开启槽位共享,计算量小的Source、map算子子任务独占了槽位,造成一定的资源浪费。

并行度与槽位数目

图 3中提到了并行度,在WordCount的例子中,除去Sink算子的并行度为1外,其他算子的并行度均为2,也就是说在并行度为2的情况下,每个算子只能拆分为2个Subtask。图 8中的方式共占用5个槽位,支持槽位共享后,图 9只占用2个槽位,这里故意将剩下的几个槽位置空,只是为了演示需要,如果这个作业的数据量非常大,占用的数据分区很多,其实完全可以通过增加并行度,将这些槽位填充,为更多的并行任务提供资源。

图 10 并行度与槽位数目

为了充分利用空槽位,占满图 9中多余的4个槽位,我们可以把除Sink外的其他算子的并行度都设置为6。图 2‑10展示了将并行度增加后,资源分配情况。

并行度和槽位数目的概念可能容易让人混淆,这里再次阐明一下。用户使用Flink提供的API算子可以构建一个逻辑视图,需要将任务并行才能被物理执行。整个作业将被切分为多个实例,每个实例处理整个作业输入数据的一部分。如果输入数据过大,增大并行度可以增加更多的实例,加快数据处理速度。可见,并行度是Flink对任务并行切分的一种描述。槽位数目是在资源设置时,对单个TaskManager的资源切分粒度。并行度、槽位数目和TaskManager数可大致按照公式 1来计算。

公式 1 并行度、TaskManager数与Task Slot数关系

其中,ceil为上限函数,表示对除法结果向上取整。关于并行度、槽位数目等配置,将在后续文章中详细说明。

图 11 Flink API结构

我们之前讨论的WordCount例子中,一直使用的是Flink提供的DataStream API,即在数据流上的操作。除了DataStream API,Flink给编程人员不同层次API,主要有三层:

  1. Flink最底层提供的是有状态的流式计算引擎,流(Stream)、状态(State)和时间(Time)等流式计算概念都在这一层得到了实现。
  2. 一般情况下,应用程序不会使用上述底层接口,而是使用Flink提供的核心API:针对有界和无界数据流的DataStream API和针对有界数据集的DataSet API。用户可以使用这两个API进行常用的数据处理:转换(Transformation)、连接(Join)、聚合(Aggregation)、窗口(Window)以及对状态(State)的操作。这一层有点像Spark提供的RDD级别的接口。
  3. Table API和SQL是更高级别的抽象。在这一层,数据被转换成了关系型数据库式的表格,每个表格拥有一个表模式(Schema),用户可以像操作表格那样操作流式数据,例如可以使用针对结构化数据的select、join、group-by等操作。如果用户熟悉SQL语句,那么可以很快上手Flink的Table API和SQL。很多公司的数据流非常依赖SQL,Flink SQL降低了从其他框架迁移至Flink的成本。

我们将在后续文章中介绍DataStream API、Table API和SQL。

2.6 Flink组件栈

了解Flink的主从架构以及API结构后,我们可以将Flink的核心组件分层来剖析。

图 12 Flink组件栈

部署层

大数据引擎首先需要部署在物理机或虚拟机上。Flink支持多种部署方式,可以部署在单机、集群,以及云上。

运行时层

运行时(Runtime)层为Flink各类计算提供了实现。这一层做了前面章节中提到的将数据流图转化为物理执行图、资源分配以及分布式调度与执行等大部分工作。

API层

API层主要实现了面向数据流的流处理DataStream API和面向数据集的批处理DataSet API。在这两个API之上,Flink还提供了更丰富的工具:

  • 面向数据流处理的:CEP(Complex Event Process,复杂事件处理)、基于类SQL的Table API和SQL
  • 面向数据集批处理的:FlinkML(机器学习计算库)、Gelly(图计算库)

3 Flink时间处理机制

3.1 时间窗口

在批处理场景下,数据已经是按照某个时间维度分批次地存储了。一些公司经常将用户行为日志按天存储在一个文件目录下,另外一些开放数据集都会说明数据采集的时间始末。因此,对于批处理任务,处理一个数据集,其实就是对该数据集对应的时间窗口内的数据进行处理。在流计算场景下,数据以源源不断的流的形式存在,数据一直在产生,没有始末。我们要对数据进行处理时,往往需要明确一个时间窗口,比如,数据在“每秒”、“每小时”、“每天”的维度下的一些特性。一般有如下几种定义时间窗口的方式。

滚动窗口

图 13 固定数据数目的滚动窗口

图 14 固定时间间隔的滚动窗口

滚动窗口(Tumbling Window)模式下窗口之间互不重叠,且窗口长度是固定的,长度可以是数据的条数,也可以是时间间隔。图 13是固定长度为4的滚动窗口,图 14是固定长度为10分钟的滚动窗口。定长滚动窗口是经常用到的一种窗口模式。在本文最开始的WordCount例子中,我们使用的是定长为5秒的滚动窗口。

滑动窗口

图 15 滑动窗口

滑动窗口(Sliding Window)也是一种窗口长度定长的模式。与滚动窗口不同,滑动窗口模式下窗口和窗口之间有滑动间隔(Slide)。再以WordCount为例,我们要统计10分钟内的词频,并且每隔1分钟统计一次,就需要使用滑动窗口。

会话窗口

会话(Session)是一个用户交互概念,常常出现在互联网应用上,一般指用户在某APP或某网站上短期内产生的一系列行为。比如,用户在手机淘宝上短时间有大量的搜索和点击的行为,这系列行为事件组成了一个Session,接着可能因为一些其他因素,用户暂停了与APP的交互,过一会用户又返回APP,经过一系列搜索、点击、与客服沟通,最终下单。Session窗口的长度并不固定,因此不能用上面两种形式的窗口来建模。

图 16 会话窗口

Session没有固定长度,那如何将数据划分到不同的窗口呢?Flink提供了Session Gap的概念。

图 17 session gap示意图

我们继续以用户在手机淘宝上的行为为例,现在有3个用户,每个用户产生了不同的行为,果两个行为数据的时间戳小于session gap,则被划归到同一个窗口中,图 17中user2的window4,如两个行为数据的时间戳大于了session gap,则被划归到两个不同的窗口中,user2的window1和window2之间的时间间隔大于最小的session gap,数据被划归为了两个窗口。

我们将在后续文章详细介绍以上几种窗口的使用方法。

3.2 Flink三种时间语义

如果我们要定义基于时间的窗口,那么首先要定义时间。在程序中,时间一般基于Unix时间戳,即以1970-01-01-00:00:00.000为起始点。时间戳毫秒精度是时间距离该起点的毫秒总数,时间戳微秒精度是事件距离该起点的微秒总数。

图 18 三种时间语义

之前文章中我们提到了流处理的时间语义问题,在Flink中一般有三种时间概念,如图 18所示。

  • 事件时间(Event Time)是事件实际发生的时间,通常是事件发生时嵌入到事件上的时间,比如某个传感器在生成数据时,会将时间戳打入这个数据上。
  • 接收时间(Ingestion Time)是事件进入Flink的时间,确切的说,是该事件进入Source算子时,Source算子的当前时间。
  • 处理时间(Processing Time)是各个时间算子处理该事件的当前时间。一般情况下,处理时间会比摄入时间更晚一些。

Processing Time是最简单的时间概念,只需要算子获取当前运行机器的系统时间,不需要考虑其他任何因素,因此使用Processing Time作为时间,可以获得最好的性能和最低的延迟。但Processing Time并不能代表事件实际发生的时间,从事件实际发生到算子处理的过程有大量的不确定性,以Processing Time来计算,很可能导致事件的处理是乱序的,产生不可复现的结果。

Event Time可以保证事件顺序的可靠性,因此可以得到一致的、可复现的结果。Event Time虽然准确,但也有其弊端:我们无法预知某个时间下,是否所有数据均已到达,因此需要使用水位线机制处理延迟数据。

3.3 水位线

之前文章已经提到,水位线(Watermark)机制假设在某个时间点上,不会有比这个时间点更晚的上报数据。Watermark常被作为一个时间窗口的结束时间。

图 19 一个带有Watermark的数据流

Flink中的Watermark是被系统插入到数据流的特殊数据。Watermark的时间戳单调递增,且与事件时间戳相关。如上图的数据流所示,方块是事件,三角形是该事件对应的时间戳,圆圈为Watermark。当Flink接受到时间戳值为5的Watermark时,系统假设时间戳小于5的事件均已到达,后续到达的小于5的事件均为延迟数据。Flink处理到最新的Watermark,会开启这个时间窗口的计算,把这个Watermark之前的数据纳入进此次计算,延迟数据则不能被纳入进来,因此使用Watermark会导致微小的误差。

生成Watermark

流数据中的事件时间戳与Watermark高度相关,事件时间戳的抽取和Watermark的生成也基本是同时进行的,抽取的过程会遇到下面两种情况:

  1. 数据流中已经包含了事件时间戳和Watermark。
  2. 使用抽取算子生成事件时间戳和Watermark,这也是实际应用中更为常见的场景。因为后续的计算都依赖时间,Watermark抽取算子最好在数据接入后马上调用。具体而言,Watermark抽取算子包含两个函数:第一个函数从数据流的事件中抽取时间戳,并将时间戳赋值到事件的元数据上,第二个函数生成Watermark。

Flink有两种方式来生成Watermark:

  1. 周期性(Periodic)生成Watermark:Flink每隔一定时间间隔,定期调用Watermark生成函数。这种方式下,Watermark的生成与时间有周期性的关系。
  2. 断点式(Punctuated)生成Watermark:数据流中某些带有特殊标记的数据自带了Watermark信息,Flink监控数据流中的每个事件,当接收到带有特殊标记数据时,会触发Watermark的生成。这种方式下,Watermark的生成与时间无关,与何时接收到特殊标记数据有关。

无论是以上那种方式,Flink都会生成Watermark并插入到数据流中。一旦时间戳和Watermark生成后,后续的算子将以Event Time的时间语义来处理这个数据流。Flink把时间处理部分的代码都做了封装,会在内部处理各类时间问题,用户不需要担心延迟数据等任何时间相关问题。用户只需要在数据接入的一开始生成时间戳和Watermark,Flink会负责剩下的事情。

延迟数据

Flink有一些机制专门收集和处理延迟数据。迟到事件在Watermark之后到达,一般处理的方式有三种:

  1. 将迟到事件作为错误事件直接丢弃
  2. 将迟到事件收集起来另外再处理
  3. 重新触发计算

对于第二种方式,用户可以使用Flink提供的“Side Output”机制,将迟到事件放入一个单独的数据流,以便再对其单独处理。

对于第三种方式,用户可以使用Flink提供的“Allowed Lateness”机制,设置一个允许的最大迟到时长,原定的时间窗口关闭后,Flink仍然会保存该窗口的状态,直至超过迟到时长,迟到的事件加上原来的事件一起重新被计算。

我们将在后续文章中详细介绍Event Time的使用、Watermark生成、延迟数据处理等技术细节。

4 Flink的状态和检查点

4.1 状态

在之前的文章中我们已经提到了状态的概念:流式大数据处理引擎会根据流入数据持续更新状态数据。状态可以是当前所处理事件的位置偏移(Offset)、一个时间窗口内的某种输入数据、或与具体作业有关的自定义变量。

图 20 数据流与状态示意图

对于WordCount的例子来说,已经处理了一个”Hello”单词,并且正在处理一个”Hello”,对于Source算子来说,当前数据的位置偏移为3,所有已处理的数据中,单词”Hello”的出现次数为2。这个作业的状态包括当前处理的位置偏移、已处理过的单词出现次数等变量信息。

4.2 检查点

一致性检查点

在一个有状态的流处理作业中,为保证高吞吐和低延迟,Flink的每个Task需要高效读写状态数据,Task会在本地的TaskManager中存储状态数据。然而,由于大数据系统一般运行在多台机器上,可能会遇到进程被杀、机器宕机、网络抖动等问题,一旦出现宕机等问题,该机器上的状态以及相应的计算会丢失,因此需要一种恢复机制来应对这些潜在问题。

Flink使用一致性检查点(Consistent Checkpoint)技术来做故障恢复。检查点机制一般是定期将状态数据生成快照(Snapshot),持久化存储起来,一旦发生意外,Flink主动重启应用,并从最近的快照中恢复,再继续处理新流入数据。一致性检查点技术可以将分布在多台节点上的所有状态都记录下来,并提供了Exactly-Once的投递保障,其背后是使用了Chandy-Lamport算法,将本地的状态数据保存到一个存储空间上,故障发生后及时恢复最近的快照数据。我们将在后续文章中详细介绍一致性检查点的算法原理。

状态后端

Task在本地内存中保存一份状态数据,但在分布式系统中,某个Task在任意时间点都可能发生故障,因此Task上的本地状态数据可以被认为是脆弱的。Flink定期将本地的状态数据持久化保存到一个存储空间上。用户可以选择以怎样的方式来保存这些状态数据,这种机制被称为状态后端(State Backend)。Flink提供了三种状态后端:内存、文件系统和RocksDB。

内存肯定是读写性能最优的方式,单个节点的内存有限,因此这种状态后端会对状态数据的大小有限制。相比内存,本地磁盘的速度更慢,其所能承担的数据量更大,RocksDB 就是一种基于本地磁盘的状态后端。此外,Flink还允许将数据存储到分布式文件系统,如Hadoop的HDFS和AWS的S3上,分布式文件系统的数据存储能力非常大,足以应付海量数据的存储需求。我们将在后续文章中详细介绍三种状态后端的使用方法。

Savepoint

在容错上,除了Checkpoint,Flink还提供了Savepoint机制。从名称和实现上,这两个机制都极其相似,甚至Savepoint会使用Checkpoint的数据,但实际上,这两个机制的定位不同。

图 21 Checkpoint和Savepoint

Checkpoint是Flink定期触发并自动执行的故障恢复机制,以应对各种意外情况,其设计初衷主要是针对容错和故障恢复。Savepoint会使用Checkpoint生成的快照数据,但与Checkpoint不同点在于,Savepoint需要编程人员手动介入,用来恢复暂停作业。相比而言,Checkpoint是自动执行,Savepoint是手动管理。

当我们想要手动处理之前已经处理过的数据,就可以使用Savepoint,因此Savepoint经常被用来调试程序:

  • 我们可以给同一份作业设置不同的并行度,来找到最佳的并行度设置
  • 我们想测试一个新功能或修复一个已知的bug,并用新的程序逻辑处理原来的数据
  • 进行一些A/B实验,使用相同的数据源测试程序的不同版本
  • 因为状态可以被持久化存储到分布式文件系统上,我们甚至可以将同样一份应用程序从一个集群迁移到另一个集群,只需保证不同的集群都可以访问这个文件系统

Checkpoint 和 Savepoint 是Flink提供的两个相似的功能,它们满足了不同的需求,以确保一致性、容错性,满足了作业升级、BUG 修复、迁移、A/B测试等不同需求。

updatedupdated2024-12-152024-12-15