Apache Flink的分布式快照方法:

这篇文章是关于什么?

这是2016年分布式计算(Apache Hadoop,Spark,…)的圣诞日程活动中第18天的文章。

这篇文章讲了什么内容?

关于Apache Flink的分布式快照方式。
基础信息是来自于《分布式数据流的轻量级异步快照》。

然而,由于这篇论文本身也只是某一时点的快照,所以在最新版本中情况可能已经发生了很大变化,但由于我还没有确认到那个程度,请谅解。

因为不想阅读麻烦,总结一下是什么意思?

    • 入力に定期的にバリアマーカーを投入して、ストリームデータ処理をステージに区切るようにした。

 

    • 各オペレータはバリアマーカーを受信したタイミングで該当入力チャネルからの受信をブロックし、全入力チャネルからバリアマーカーを受信したタイミングで状態を保存する。

 

    • 上記の方式により、オペレータの状態のみの保存でストリームデータ処理のトポロジ全体のスナップショットを取得できる。

 

    循環サイクルが存在する場合、循環でない全入力チャネルからバリアマーカーを受信したタイミングで循環サイクルの出力チャネルにバリアマーカーを送信し、循環サイクルの入力にバリアマーカーが入ってくるまでに受信したレコードもスナップショットに含めることで対応している。

轻量级异步快照用于分布式数据流

总结

通过分散的有状态流处理,可以执行大规模连续计算,以实现低延迟和高吞吐量的目标。

为了实现这一目标,其中一个基本问题是在潜在障碍下提供处理保障。

现有的方法依赖于可以用于故障恢复的定期全局状态快照。

这些方法存在两个主要缺点。

    • 全体の計算を停止させることが多い。

 

    • 必要以上に大きなスナップショットが必要となる。

動作状態とともに、通信中のすべての状態を保持してしまう。

在本論文中,我們提出了一種適用於最新的數據流執行引擎的輕量級算法,名為非同步屏障快照(ABS),以應對這些情況。
在非循環圖的數據流中,ABS僅保留運算符狀態,而在循環圖的數據流中,僅維持最少的記錄日誌。
我們在支持分散狀態流處理的分散分析引擎Apache Flink中實現了ABS。
評估結果表明,ABS對執行沒有很大影響,並保持線性可擴展性,能夠成功處理頻繁的快照情況。

1. 引入

分散式的有状态流处理是计算的一种新范式,可以执行大规模连续计算,同时针对低延迟和高吞吐量两个目标。在这种系统中,容错性非常重要。
目前已知的确保有状态处理系统上准确一次语义的方法是依赖于执行状态的整体一致快照。
然而,在实时流处理中使用该应用程序时存在两个主要缺点。

    • 同期スナップショット技法は、全体的な状態の一貫したビューを得るために分散計算の全体的な実行を停止する。

 

    • 分散スナップショットの既存のアルゴリズムはスナップショット状態の一部として、実行グラフ全体でチャネルまたは未処理メッセージで転送中のレコードを含む。

ほとんどの場合、これには必要以上の状態が含まれる。

在本论文提出的算法中,我们重点关注为分布式的有状态数据流系统提供轻量级的快照,并且与现有方式相比,性能影响非常小。
在提出的算法中,我们提供低成本的异步状态快照,仅包含非循环图运算符状态。
此外,通过将下游备份应用于所选拓扑的部分,我们能够在最小化快照状态的同时覆盖循环图的情况。
我们的技术能够在不停止流操作的情况下,仅产生极少的运行时开销。
本论文的贡献可总结如下。

    • 非循環グラフ上で最小限のスナップショットを実現する非同期スナップショットアルゴリズムを提案して実装する。

 

    • 我々は、循環グラフ上で動作するアルゴリズムの一般化を記述し実装する。

 

    Apache Flink Streamingを比較のベースシステムとして使用している最先端技術と比較して、我々のアプローチの利点を示す。

2. 相关研究

在过去的10年里,对于连续处理系统,已经提出了几种恢复机制的建议。像Discretized Streams和Comet这样的无状态分布式批处理计算系统,需要依赖状态重新计算来模拟连续处理。而像Naiad、SDGs、Piccolo、SEEP这样的有状态数据流系统,则使用检查点来获取全局执行同步快照,以用于故障恢复。

数十年来,Chandy和Lamport提出了在分布式环境下实现一致性全局快照的问题,并且这个问题已经被广泛研究。
理论上,全局快照反映了执行的整体状态或者特定操作的可能状态。
Naiad采用的简单而费时的方法是通过3个步骤来执行同步快照。

    • 実行グラフの全体的な計算を停止

 

    • スナップショットを実行

 

    最後にグローバルスナップショットが完了したら各タスクに操作を続行するよう指示

这种方法需要阻塞整个计算,并且对吞吐量和空间都有很大影响。
另一种常见的方法是由Chandy和Lamport最初提出的,已经应用在今天许多系统中,它是在积极执行上游备份的同时异步执行快照。
通过将触发运算符和通道状态的持久性分散到整个执行图中的标记来实现。
然而,这种方法仍然存在对空间的挑战,因为它需要上游备份,导致恢复时间延长,需要重新处理备份记录。
我们的方法是扩展了Chandy和Lamport原始异步快照的思想,但不考虑非循环图记录的备份日志,即使是循环图,也只获取最低限度的备份日志。

3. 背景:Apache Flink的背景

我们现在为了确保Apache Flink Streaming在Apache Flink Stack(旧称Stratosphere)中作为分布式流分析系统具备故障容忍性,提出了该算法。
Apache Flink由一个通用的运行时引擎组成,用于统一处理批处理作业和流式作业,这些作业由有状态的互连任务组成。
Flink的分析作业被组织成一个任务的有向图。
数据元素是从外部来源获取的,并通过管道方式路由到任务图中。
任务基于接收到的输入连续地操作内部状态并生成新的输出。

3.1 流操作编程模型

Apache Flink API用于流处理,通过将无限分割的数据流(部分有序的记录序列)作为称为数据流的核心数据抽象来公开,从而实现复杂的流式分析作业的构建。

DataStreams可以通过从外部源(例如消息队列、套接字流、自定义生成器)获取数据,或者在其他数据流上调用操作来创建。

DataStreams以记录为单位进行增量应用,并支持多个操作符,例如map、filter和reduce,以生成新的DataStream,这些操作符采用上层函数的形式。

所有操作符都可以通过在每个流的不同分区中以并行实例的方式执行来并行化,从而实现流转换的分布式执行。

下面是一些代码示例。

val env : StreamExecutionEnvironment = ...
env.setParallelism(2)

val wordStream = env.readTextFile(path)
val countStream = wordStream.groupBy(_).count
countStream.print

上述的代码展示了如何在Apache Flink中实现简单的增量字数统计。
这个程序中,单词从文本文件中读取,并将每个单词的当前计数输出到标准输出。
这是一个有状态的流式程序,源需要识别当前文件偏移量,计数器需要将每个单词的当前计数作为内部状态维护。

3.2 分散数据流执行

假设顶点$T$代表任务,边$E$代表任务之间的数据通道。
当用户执行应用程序时,所有的操作符都会被编译成原则上是有向图$G=(T, E)$的执行图。
以增量单词计数为例,执行图如下所示。

Figure01.JPG

就像上文所述,每个操作员实例都被封装在各自的任务中。
任务可以被分类为从系统外部获取数据的源。
此外,M表示由任务在并行执行期间传递的所有记录集。
每个任务t∈T都封装了操作员实例的独立执行,并由以下内容构成。

    • 入力チャネルと出力チャネルのセット:$It、Ot⊆E$

 

    • オペレータ状態:$st$

 

    ユーザー定義関数:$ft$

数据采集是按照拉取方式进行的。
在执行过程中,每个任务都会消耗输入记录,更新操作员状态,并根据用户定义的函数生成新记录。
更具体地说,对于任务$t∈T$接收到的每个记录$r∈M$,会生成新的状态$st’$和输出记录集合$D⊆M$,生成规则如下:

ft:st、r |→ <st'、D>

4. 异步栅栏快照

分散处理系统需要具备对故障进行恢复的机制。
提供这种恢复机制的方法是定期获取执行图的快照,并在之后用于从故障中恢复。
快照是执行图的全局状态,保存了从特定执行状态恢复计算所需的所有信息。

4.1 问题的定义

将执行图G=(T, E)的全局快照G*=(T*, E*)定义为所有任务状态T*和边状态E*的集合。更详细地说,T*由所有操作符的状态st*∈T*:∀t∈T构成。同样,E*是所有通道状态e*∈E的集合。这里,e*由在e上传输的记录构成。

如Tel所述,为了确保结束性、可执行性和恢复后的正确结果,每个快照$G*$都需要保留所需的信息。

当所有进程都存活时,快照算法在开始后一定时间内保证最终结束。

可行性表示对快照的有效性。也就是说,在快照处理期间,信息与计算没有丢失。

事实上,从快照的角度来看,任务交付的记录也应该被获取,这意味着因果关系在快照中保持不变。

在非循环图数据流中的处理

在非循环图数据流中,通过将执行分为多个阶段,可以在不获取通道状态的情况下获取快照。
阶段将注入的数据流和所有相关计算分为一系列可能的执行,其中所有先前的输入和生成的输出都已完全处理。
最后一个阶段的操作符状态集反映了整个执行历史,因此只能在快照中使用。
我们算法的核心思想是在保持连续数据捕获的同时,使用分阶段的快照来创建相同的快照。

本论文提出的算法通过周期性注入特殊的栅栏标记到输入数据流中,来模拟连续数据流执行中的阶段划分,并将整个执行图发送到终点。
快照是逐步构建的,每当每个任务接收到表示执行阶段的栅栏标记时。
此外,算法还做出以下假设:

    • ある特定のタスク間を接続するネットワークチャネルは信頼性が高く、FIFO配信順序を尊重し、ブロックおよびブロック解除することが可能。

チャネルがブロックされると、すべてのメッセージはバッファされるが、ブロックが解除されるまで配信されない。

タスクは、ブロック、ブロック解除、メッセージ送信などのチャネルの操作を実行可能。

下流全てに対してバリアマーカーを送信するようなブロードキャスト的な使用も可能。

ABS算法的执行过程如下所示。

Figure02.JPG

中央协调器定期向所有源任务发送屏障标记。
当源任务接收到屏障标记时,获取当前状态的快照,并将屏障标记广播到所有输出。:$a)$
非源任务如果从其输入之一接收到屏障标记,则阻塞该输入,直到从所有输入接收到屏障标记。:$b)$
当所有输入都接收到屏障标记时,任务获取当前状态的快照,并将屏障标记广播到所有输出。:$c)$
然后,任务解除输入的阻塞并继续计算。:$d)$

具有完整的全局快照$G*=(T*, E*)$,其由仅仅具有$E*=0$的所有操作状态$T*$构成。

证明:如前所述,快照算法需要保证终止性和可行性。终止性由通道和非循环图的属性保证。通道的可靠性保证了只要任务存在,所有发送的屏障标记最终会被接收。此外,由于始终存在源路径,非循环图中的所有任务最终都会从所有输入接收到屏障标记并获取快照。

只要能够显示全局快照的操作员状态已经处理到最后阶段的记录历史,就足够说明其可实现性。这是由于通道的FIFO排序特性以及在阶段后进行快照之前处理的无快照记录(接续于障碍的记录)所保证的。

4.3 在循环图数据流中的应对方法

如果执行图中存在循环周期,前面提到的ABS算法不会进入死锁,因为循环周期内的任务会无限期地等待所有输入的屏障。此外,循环周期内正在传输的记录不会包含在快照中,因此不会影响可行性。因此,为了确保可行性,并在恢复时将这些记录以循环周期内的传输中状态进行恢复,需要将在一个循环周期内生成的记录包含在快照中。当处理循环图时,可以扩展基本算法,如下所示,而不会引起额外的阻塞。

首先,在静态分析中确定执行图中循环中的反向边 L。
根据控制流图理论,有向图的反向边是在深度优先搜索期间到达已访问过的任务的边。
执行图 G(T, E/L) 是包含循环图中所有任务的有向无环图。
从这个有向无环图的角度来看,算法的行为与之前相同,但在快照期间还会获取从反向边接收到的记录的下游备份。
这在每个任务 t 中作为流的消费者执行。要满足 Lt ⊆ It。
对于每个任务,在传输障碍标记转发时,创建备份日志,其中包含从 Lt 接收到的所有记录。
由于传输障碍标记获取了循环内的所有记录到下游日志,因此只会在一致的快照中包含一次该标记。

将这个循环适配版本的算法称为ABS算法2。

ABS算法2将按照下图所示的方式执行。

Figure03.JPG

具有后退边输入的任务,在从非自身后退边的通道接收到屏障标记时,会创建状态的本地副本:$b)$ 同时,从这一点开始,它们记录接收到的所有记录,来自它们的后退边。然后,在从后退边接收到屏障标记时,它们以至此接收到的记录作为快照进行保存:$c)$ 最终的全局快照$G*=(T*, E / L*)$将只包含所有任务状态$T*$和正在传输中的后退边的记录$L*⊂E*$。

证明:
我们证明了该版本算法的终止性和可实现性得到了保证。
所有任务最终会从所有输入(包括后退边通道)接收到屏障标记并完成快照,因此终止性得到了与4.2版本相同的保证。
当接收到所有正常输入的屏障标记时,立即通过广播发送该屏障标记,以避免死锁状态。
通道的FIFO特性对于后退边仍然有效,从以下特性来看,可实现性得到了证明。

    • スナップショットに含まれる各タスク状態は、通常の入力で受け取ったバリアマーカーからの事後イベントを処理する前に取られたそれぞれのタスクの状態コピーとなる。

 

    スナップショットに含まれるダウンストリームログは完全であり、FIFO保証のために後退エッジで受信されたバリアマーカー受信前に受信した、すべての転送中のポストショットレコードを含む。

5. 灾害恢复

关于障碍恢复的操作,我们将进行解释。
在具有一致性的快照中,有几种能够工作的障碍恢复方式。
最简单的形式是,可以从最后一个全局快照中恢复整个执行图如下:
对于所有任务t,执行以下操作。

    • スナップショットstの関連する状態を読込み、それを初期状態として設定

 

    • 後退エッジ用バックアップアップログを読込み、すべてのレコードを処理

 

    入力チャネルからのレコード取り込みを開始

通过重新调度上游的任务(将依赖于失败的输出通道的任务持有的任务)并将每个上游任务重新调度到源头,可以实现与TimeStream类似的图形部分恢复。以下图表显示了恢复计划的示例。

Figure04.JPG

为了提供仅一次的消息处理语义,为避免重新计算,所有下游任务都需要忽略重复记录。为了实现这一目标,我们可以采用类似于SDG的方式,标记具有特定序列号的来自源头的记录。因此,每个下游任务都可以丢弃已经处理过的记录中序列号较小的记录。

6. 实施

为了在流处理时提供Exactly once的处理语义,我们对Apache Flink进行了ABS算法的实现。
当前的实现不会将阻塞的通道中的所有接收记录都存储在内存中,而是将其存储在硬盘上,以提高可扩展性。
这种方法保证了鲁棒性,但会增加ABS算法在执行时的影响。

为了区别操作员的状态和数据,我们引入了明确的OperatorState接口,该接口包含了用于更新和获取操作员状态的方法。
此外,为了支持基于偏移量的源和聚合,在Apache Flink中提供了用于状态操作员的OperatorState实现。

快照协调器是以作业管理器的角色进程实现的,它负责维护单个作业的执行图的全局状态。
协调器会定期向执行图的所有源头插入屏障标记。
在重新启动或重新配置时,最后一个快照被恢复到运算符上,该快照是从分布式内存持久存储中恢复的。

7. 评价

评估的目的是比较在ABS和Naiad中采用的全局同步快照算法的运行时开销,并进一步评估算法在多个节点上的可扩展性。

7.1 评估环境

用于评估的执行拓扑如下图所示。

Figure05.JPG

这个执行由6个不同的操作符组成,每个操作符具有与集群节点数相等的并行性。
也就是说,$6 * 集群节点数 = 任务数$。
这个执行包含了3个完整的网络洗牌,并且验证了通道块在ABS中的影响。
源代码生成了总共10亿条记录,并在源实例之间均匀分布。
操作符的状态在拓扑中是按键聚合和源偏移进行的。
评估在Amazon EC2集群上使用最多40个m3.medium实例进行。

我们使用不同的快照方式,即ABS和同步快照,以不同的快照间隔测量运行中评估作业的运行时开销。
为了进行比较,我们实现了在Apache Flink上使用的与Naiad中使用的同步快照创建算法相同的算法。
这个实验是在使用10个节点的集群上运行的。
为了评估算法的可伸缩性,我们在处理一定数量的输入记录(10亿条)的同时,将拓扑的并行性从5个节点变化到40个节点。

7.2 评价结果

以下图表展示了两种算法的基准(无容错性)对运行时的影响。

Figure06.JPG

当使用的快照间隔较小时,对整体性能的影响尤为显著,这是因为为了获取全局快照,系统需要处理大量的数据而导致产生更多的空闲时间。ABS以连续执行的方式来持续工作,在不阻塞整体执行的同时保持稳定的吞吐量,所以对运行时的影响较小。当使用更大的快照间隔时,同步算法的影响就不那么重要了。然而,在突发情况下,例如在使用同步快照方式时,常常违反SLA,因为很多对于实时保证的应用程序,例如入侵检测流水线,延迟非常重要。在下图中,我们比较了执行ABS的拓扑结构和基准线,对于3秒的快照间隔(不考虑容错),可以明显看出基准线和ABS都实现了线性可扩展性。

Figure07.JPG

8. 未来展望

在未来的研究中,我们将考虑通过将快照状态和运行状态分离,进一步减少ABS的影响。
通过这样做,任务可以在保持快照的同时连续处理记录,从而实现纯粹的异步状态管理。
在这种方式中,需要使预准备和后续记录与各自状态同步。可以通过为记录标记与记录所属的快照相对应来解决这些记录的问题。
我们计划使用该方法来比较算法计算、空间和网络的I/O需求,与当前的ABS实现及其性能进行比较。
最后,我们计划调查各种恢复方法,以在保持“Exactly once”语义的同时,通过任务级别的操作将重建的需求最小化。

总结

我尝试阅读了上述内容。由于算法本身具有简单的结构,因此内容易于阅读。
关于使用这种方式进行状态保持,不仅在Flink中,其他流数据处理基础设施中也可以看到类似的情况,因此这对我来说是一个参考。

广告
将在 10 秒后关闭
bannerAds