Gearpump的实时处理基础架构是如何组成的?

你好。

由于我已经了解了Gearpump的概要,所以这次我打算看一下其组成概览。我参考的是Gearpump官网,按照以下步骤进行观看。

    • Gearpump Basic Concepts

 

    • Gearpump Technical Highlights

 

    • Reliable Message Delivery

 

    • Gearpump Performance Report

 

    Gearpump Internals

请参考以下链接:
http://www.gearpump.io/

Gearpump 基本概念

系统时间戳和应用程序时间戳

系统时间戳表示后端集群的时间。应用程序时间戳表示消息生成的时间。
例如,在物联网边缘设备中,设备生成消息的时间被称为应用程序时间戳。
后端在接收消息的时间分配的时间被称为系统时间戳。

师傅,和工人

Gearpump由主从架构构成。每个集群由一个或多个主节点和若干个工作节点组成。工作节点负责在某台机器上进行资源管理,而主节点负责整个集群的资源管理。

actor_hierarchy.png

应用程序

Application在集群上作为并行执行的单位。应用程序可以有不同的类型。例如,MapReduce应用程序和Streaming应用程序都是不同类型的应用程序。Gearpump主要支持Streaming应用程序,并提供了创建类似distributedShell这样的自定义类型应用程序的模板。

应用主管和执行器

在应用程序实例的运行状态中,可以通过一个AppMaster和一个Executor的列表来表示。AppMaster可以作为命令和应用程序实例的控制中心。AppMaster会与用户、Master节点、Worker节点和Executor进行通信以执行各种命令。每个Executor都是执行分布式应用程序的并行单位。通常情况下,AppMaster和Executor会作为在Worker节点上启动的JVM进程。

应用提交流程

当用户将应用程序投入到主节点时,主节点会查找可用的工作节点并启动应用主管。应用主管启动后,会向主节点发出请求,要求启动执行器以获得资源(即工作节点)。此时,执行器只是以空容器的状态启动。执行器启动后,应用主管会将实际的计算任务分配给执行器并进行并行执行。

在应用投入时,Gearpump客户端需要指定在DAG中定义的计算,并将其提交到活动的主节点上。SubmitApplication消息发送给主节点后,会被转发给AppManager。

submit.png

AppManager会查找可用的工作节点,并将AppMaster作为Worker节点上的JVM子进程启动。AppMaster与Master进行通信,以分配资源来执行应用程序中定义的DAG。分配的Worker节点将作为新的JVM进程启动执行器。

submit2.png

流处理拓扑、处理器和任务

在流媒体应用程序中,每个应用程序都包含一个拓扑(以DAG形式描述的数据流)。拓扑中的每个节点都是一个处理器。以WordCount为例,WordCount由两个处理器Split和Sum组成。Split处理器将每行分割成单词列表,Sum处理器将每个单词的出现频率汇总。

应用程序是由处理器的DAG组成的,每个处理器都处理消息。

dag.png

流式任务和分区器

在流媒体应用中,任务是并行执行的最小单位。在执行过程中,每个处理器都被并行化为任务列表,并且不同的任务会在不同的执行器上执行。同时,可以通过定义分区器来指定从上游处理器的任务发送消息到下游处理器的任务的分配规则。

shuffle.png

Gearpump技术亮点

到处都是演员 shì

Actor模型是由Carl Hewitt提出的并发模型。Actor模型以Actor为单位进行聚合,并表现得像被外部Actor隔离的微服务一样。Actor是Gearpump的基础,提供消息发送和接收、错误处理、存活检查等功能。Gearpump在各种功能中使用Actor,使集群内的所有实体都可以作为服务来使用。

actor_hierarchy.png

精确一次消息处理

“Exactly once”在中文中的定义如下:
当随消息而来的操作在输出目标或履历中仅处理一次,且不会对后续的处理产生任何影响。
更详细的信息将在后文中提及。

exact.png

拓扑图有向无环图领域专用语言 (Topology DAG DSL)

用户可以将计算DAG投入到Gearpump中,计算DAG包含节点和边的列表,节点以任务的集合形式并行执行。Gearpump会自动将各个任务组分配给各个机器。每个任务作为Actor启动,并充当长期运行的微服务。

dag.png

流量控制

Gearpump内置了流控制功能,确保上游任务不会向下游任务发送超出其承载能力的消息。

flowcontrol.png

没有固有的端到端延迟

Gearpump是一种基于消息的流式引擎,它表示在DAG中的所有任务在接收到消息时立即处理,并在处理完毕后立即将消息发送到下游。Gearpump不进行批处理,即使数据已经存储。

高性能消息传递

通过使用智能批处理策略,Gearpump变得非常高效,可以发送和接收小尺寸的消息。在进行了一个涉及4台机器的测试中,整个集群的吞吐量达到了每秒1800万条100字节的消息。

dashboard.png

高可用性,无单点故障。

Gearpump的设计考虑了高可用性。它针对消息丢失、Worker节点崩溃、应用程序崩溃、Master节点崩溃和脑裂等故障进行了考虑,并配备了在这些故障发生时进行恢复的机制。
– 如果发生消息丢失,它会重新执行
– 如果Worker节点或应用程序崩溃,它会在新机器上重新调度

为了实现Master的高可用性,我们使用了Akka集群和CRDTs(冲突自由数据类型)来进行状态同步,并使用多个Master节点。通过这种机制,即使发生故障,仍然能保持Quorum的存在,并保持Master的功能。当一个Master节点发生故障时,Akka集群中的其他Master节点会接管,并恢复状态。

ha.png

动态计算 DAG

Gearpump提供了一项功能,即使在运行时也能动态地添加、删除、更新DAG中的子图,而不需要重新启动整个Topology。

dynamic.png

能够处理乱序消息

类似滑动窗口的窗口操作在时间窗口内获取到的所有消息的准确结果中起到了重要作用。但是,在这种情况下,如何处理延迟到达的消息呢?Gearpump通过始终追踪所有消息的时间戳的最小值来解决这个问题,从而可以知道某个时间窗口内的处理消息是否全部完成。

clock.png

可定制化的平台

不同的应用程序需要不同的性能指标(例如:吞吐量、强一致性等),并且也需要不同的资源(例如:CPU资源、数据局部性)。Gearpump通过自定义性能指标和资源调度策略能够满足这些需求。

内置仪表板用户界面

Gearpump具有用于集群管理和可视化的仪表板UI。
由于仪表板UI与后端通过REST通信进行信息交换,因此可以创建新的不同仪表板并在其上使用。

dashboard.gif

Kafka和HDFS的数据连接器

Gearpump具备与Kafka和HDFS的数据连接器。
对于Kafka连接器,可以重新获取特定时间戳之后的消息。

可靠的信息传递

至少一次消息传递是什么意思?

消息可能会因为网络分裂等原因而丢失。
“至少传递一次”消息传递(至少传递一次)会对丢失的消息进行处理,以确保至少收到一次确认回复。

Gearpump保证可以对过去时刻的消息进行重新获取的DataSource可以提供至少一次的模型保证。在Gearpump中,每条消息都被标记了时间戳,并且会跟踪当前正在处理的消息中最小的时间戳(全局最小时钟)。如果消息丢失,应用程序将使用全局最小时钟的值进行重新启动。由于DataSource可以使用全局最小时钟重新获取消息,因此在重新启动之前正在处理的消息将全部被重新执行。在Gearpump中,将这种类型的DataSource称为TimeReplayableSource。作为现有的TimeReplayableSource,有KafkaSource。通过使用KafkaSource将消息导入Gearpump,可以实现至少一次的消息处理。

精确一次消息传递是什么意思?

至少一次的模型无法保证应用程序的执行结果的正确性。例如,如果存在记录接收消息数量的任务,当消息被重新执行时,会导致重复处理,如果任务发生故障,接收消息数量也会丢失。
在这种情况下,需要一个模型来确保消息仅通过一次更新状态,即仅通过一次交付(exactly once message delivery)。此外,还需要过滤重复处理的消息并将内存状态持久化。

只有在Gearpump满足以下两个条件时,才能使用Exactly Once模型:
1. 使用TimeReplayableSource进行数据获取。
2. 使用Persistent API在内存中管理状态值并进行保存。

通过使用Persistent API,用户的状态值将由Gearpump定期地与Checkpoint时间一起保存在HDFS等存储中。Gearpump会保存并持久化所有当前处理中的状态值的全局最小的Checkpoint时间戳。在应用程序重新启动时,Gearpump将从全局最小的Checkpoint时间戳恢复状态,并使用恢复的值重新执行消息。从而确保所有“状态值”只会被更新一次。

持久性API

持久化API由PersistentTask和PersistentState构成。

以下是一个使用这些的示例,用于保存输入消息数量。

class CountProcessor(taskContext: TaskContext, conf: UserConfig)
  extends PersistentTask[Long](taskContext, conf) {

  override def persistentState: PersistentState[Long] = {
    import com.twitter.algebird.Monoid.longMonoid
    new NonWindowState[Long](new AlgebirdMonoid(longMonoid), new ChillSerializer[Long])
  }

  override def processMessage(state: PersistentState[Long], message: Message): Unit = {
    state.update(message.timestamp, 1L)
  }
}

CountProcessor用于生成定制的PersistentState。PersistentState由PersistentTask进行管理,并继承processMessage方法,定义了通过新消息更新状态的行为。(在上述示例中,新消息的计数为1,并将其添加到现有值中。)

Gearpump提供了以下两种状态。

    1. NonWindowState – 与时间无关或以非时间为界限的状态

 

    WindowState – 被时间窗口划分的状态

这些可以表示为满足下面的幺半群规则的状态。

    1. 具有类似于+运算符的结合律

 

    具有类似于0的基准值

在这个例子中,使用了Twitter的Algebird库提供的方便的”longMonoid”。

Gearpump性能报告

绩效评估

使用Gearpump的性能,通过使用称为SOL(参考Gearpump的示例项目)的性能简单的微基准测试结构,主要关注吞吐量和延迟这两个方面进行展示。
SOLStreamProducer不断将消息发送给SOLStreamProcessor,而SOLStreamProcessor则是一个无操作的配置。
我们准备了一个由4个节点(10GbE网络)组成的集群,并进行了确认。
每个节点的硬件配置如下所示。

处理器:32核英特尔(R)至强(R)CPU E5-2690 2.90GHz
内存:64GB

吞吐量

在验证过程中,尝试确认吞吐量的上限,启动了48个SOLStreamProducer和48个SOLStreamProcessor,如下所示,整个集群达到了1800万条消息/秒。(每条消息为100字节)

延迟

当达到最大吞吐量状态时,2个任务之间的平均延迟为8毫秒。

故障恢复时间

如果出现故障,比如检测到Executor的崩溃,Gearpump会重新分配资源并重新启动应用程序。
应用程序的重新启动需要10秒钟的时间。

dashboard.png

如何设置基准环境?

准备环境

我们使用4节点(10GbE网络)的Gearpump集群,在每个节点上配置了4个Worker来构建。在测试环境中,每台机器的配置为64GB内存和Intel(R) Xeon(R) 32核心处理器E5-2690 2.90GHz。在Gearpump上启用了指标,然后进行测试。

使用以下命令将具有48个StreamProducers和48个StreamProcessors的SOL应用程序投入。

bin/gear app -jar ./examples/sol-$VERSION-assembly.jar -streamProducer 48 -streamProcessor 48

3)打开Gearpump的仪表板,并连接到http://【启动主机】:8090/。切换到应用程序选项卡,即可查看应用程序的详细信息。

Gearpump内部机制

演员的等级体系?

actor_hierarchy.png

所有图中的元素都是Actor。
Actor可以分为两个类别,集群角色(Cluster Actors)和应用程序角色(Application Actors)。

聚集演员

工作者: 将分配给物理工作者机器。工作者负责本地机器的资源管理和度量报告。

大师:位于Cluster的核心部分,负责管理Worker、资源和应用程序。主要功能委托给三个子Actor,分别是AppManager、WorkerManager和ResourceScheduler。

应用程序的演员们

AppMaster负责将任务分配给Worker,管理Application的状态。不同的Application拥有不同的AppMaster,彼此独立。

执行器:作为AppMaster的子节点,代表JVM进程。负责管理任务的生命周期和在任务发生故障时进行恢复。

工作:承担Executor的角色,执行实际处理。所有TaskActor都持有各自的全局唯一地址。某个Task Actor可以向其他Task Actor发送消息。通过该机制,能够获得对计算DAG进行分布式处理的高度灵活性。

DAG的所有Actor都与其Supervisor一起被组合在一起,Supervisor负责监控Actor并正确处理错误。在Master中,具有风险的作业被隔离并委托给子Actor,以确保系统的健壮性。在应用程序中,创建了额外的中间层”Executor”,该层能够提供细粒度处理和在任务故障时进行早期恢复。Master负责监视AppMaster和Worker的生命周期以应对故障,但这些生命周期由Supervisor进行管理,并非直接与Master关联。因此,即使Master本身发生故障,也不会对其他部分产生影响。多个Master Actor通过Akka集群进行组合,并且Master的状态通过CRDT使用八卦协议进行同步,因此不存在单点故障。通过这种分层设计,实现了高可用性。

申请时钟和全球时钟服务

全球时钟服务是用于跟踪系统中处理中消息的最小时间戳的。每个任务将通知全球时钟服务其自身的最小时钟。每个任务的最小时钟将使用下面中最小的一个。

    • Inbox中に存在する処理待ちメッセージの中で最小のTimestampを持つメッセージのTimestamp

 

    • 送信したメッセージの中で、Ackが返ってきていないメッセージの中の最小のTimestampを持つメッセージのTimestamp。もしメッセージの欠損が発生した場合、最小クロックは進めない。

 

    Taskが保持する「状態」の中での最小クロック。もし状態が複数の入力メッセージによって積算される場合、最小クロックの値は最後に積算されたメッセージのtimestampによって決まる。「状態」のクロックは状態をスナップショットとして保存するか、メッセージがTimeWindowの範囲からフェードアウトした場合に更新される。
clock.png

全球时钟服务可以高效地跟踪所有任务的最小时钟,并保持整体的最小时钟。全球最小时钟值只会单调递增,它表示所有早于数据源中的该值的消息已经处理完成。如果发生消息丢失或任务故障,全球最小时钟将停止。

我们如何优化消息传递的性能?

在流媒体应用程序中,消息的发送和接收性能是非常重要的因素。例如,考虑到某个流媒体平台每秒需要以毫秒级的延迟处理数百万条消息的情况。实现高吞吐量和低延迟并不容易。这展示了Gearpump面临的挑战过程。

第一个挑战:网络对于小信息不够高效

在流媒体处理中,典型消息的尺寸非常小,例如一条消息的大小可能不到100字节,就像车辆的GPS数据一样。然而,在发送小尺寸消息时,网络效率非常低。如下图所示,当消息大小为50字节时,只能利用网络带宽的20%。
如何改善吞吐量呢?

through_vs_message_size.png

第二个挑战:消息开销太大

如果在两个 Actor 之间发送和接收各种消息的话,这些消息中会包含发送者和接收者的路径信息。当通过网络进行传输时,这个 ActorPath 的开销就会很大。例如,下面的 ActorPath 超过了 200 字节。

akka.tcp://system1@192.168.1.53:51582/remote/akka.tcp/2120193a-e10b-474e-bccb-8ebc4b3a0247@192.168.1.53:48948/remote/akka.tcp/system2@192.168.1.54:43676/user/master/Worker1/app_0_executor_0/group_1_task_0#-768886794

我们如何解决这个问题?

我们使用Akka Extension在Gearpump中定制了Netty传输层的开发。如下图所示,Netty客户端将ActorPath翻译为TaskId,并在Netty服务器中进行还原。只有TaskId在网络中传输,大小约为10个字节,从而最小化了开销。不同的Netty客户端Actor彼此隔离,不会互相阻塞。

netty_transport.png

为了提高性能,高效的批处理是一个重要的关键因素。我们将多个消息进行批处理,并发送到网络上。批处理大小不固定,会根据网络状态动态调整。在网络可用的情况下,我们会立即发送等待发送的消息。否则,我们会将消息进行批处理,并在一定时间后触发发送该批处理。

我们如何进行流量控制?

在没有流量控制的情况下,某个任务在接收到大量来自其他任务的消息时会导致内存溢出错误。典型的流量控制方法是使用类似TCP的滑动窗口,发送方和接收方可以互相执行而无需阻塞。

flow_control.png

在Gearpump中,面临的困难是一个任务有多个输入任务和输出任务。为了传播下游到上游的背压,需要将输入和输出进行关联。此外,流量控制需要考虑故障情况,并需要机制来恢复消息丢失的情况。作为进一步的挑战,消息驱动的流量控制会导致开销增加。如果针对每个消息都返回Ack,系统中将大量飞行的Ack消息,从而降低流式处理的性能。最终采用的方法是使用明确的Ack请求消息。当目标任务接收到Ack请求消息时,才会返回Ack,发送方仅在需要的情况下发送Ack请求。通过这种方法,能够大大减少开销。

我们如何检测信息丢失?

举个例子,在网络广告中,需要计算总点击次数,而且不想出现误计算的情况。在流媒体平台中,需要有效地跟踪丢失的信息,并尽快恢复。

messageLoss.png

我们使用流量控制消息作为Ack请求和Ack,在检测消息丢失方面起到作用。目标任务会计算自上次接收到Ack请求后收到了多少个消息,并将此信息发送回发送者。通过确认该值,发送者可以检测消息是否丢失。

以上是一个简化的例子,实际情况更为复杂,需要处理僵尸任务和正在处理的旧消息。

Gearpump如何知道要重新播放哪些消息?

某些应用程序不允许消息丢失,并且需要重新执行的情况存在。例如,在转账处理中,银行会通过短信向用户发送验证代码。如果该消息丢失,则系统需要重新执行以继续完成转账。

我们采用了源端消息存储和基于时间戳的重播方法。

replay.png

所有的消息都是不可变的,并且会被标记上时间戳。假设这个时间戳大致上是递增的(允许有些微波动)。

在Gearpump中,可以从可重新获取消息的数据源(例如:Kafka)中获取消息,或者将消息保存在可自定义的数据源的消息存储中,从而保证消息的重新执行。当发送方任务将消息发送到下游时,时间戳和消息的偏移量将定期保存在偏移-时间戳存储中。在进行恢复时,首先从偏移-时间戳存储中获取时间戳和偏移量的值,然后使用这些值从消息存储中获取数据来重新执行消息。时间戳过滤器可以删除前后不连续的消息,防止不必要的重新执行,即使消息不严格按照时间顺序排列。

掌握高可用性

在分布式流媒体系统中,系统的每个部分都有可能发生故障。因此,系统需要对各种错误情况进行相应并进行恢复。

ha.png

为了确保Gearpump的Master的高可用性,它使用了Akka clustering。这个集群由几个Master组成(不包括Worker)。通过Clustering功能,可以轻松检测到Master的崩溃并进行相应的处理。Master的状态通过akka-data-replication复制到所有Master节点上,即使某个Master崩溃,其他Master也可以获取状态并接管原来的Master。Master的状态包含所有应用程序的输入信息。如果某个应用程序停止,Master节点可以使用输入信息来恢复应用程序。为了表示这个状态,使用了CRDT LWWMap(最后写入者获胜映射)。状态本身是一个能够无冲突地涵盖分布式节点信息的HashMap。为了确保数据的强一致性,Master的状态读写是在获取Master节点之间的法定人数后进行的。

我们如何应对失败?

由于Akka强大的Actor管理功能,我们能够轻松构建弹性系统。在Gearpump中,我们为每个应用程序保留一个AppMaster实例,它们彼此完全隔离。每个应用程序都有一个管理树,按照AppMaster->Executor->Task的顺序排列。通过这种管理层次结构,我们可以摆脱僵尸进程的恐惧。例如,停止AppMaster时,会先确认整个管理层次结构都已结束,然后再停止。

以下是可能发生的多种障碍情景的描述。

failures.png

当主席发生什么事情时会发生什么?

如果主节点崩溃,将通知备份主节点,并在恢复主节点状态后接管控制权。同时,还会通知工作节点和应用主节点,开始搜索新的活跃主节点,并持续执行该过程直到完成解析。如果应用主节点和工作节点无法检测到新的主节点并且超时,将停止自身。

当工人发生事故时会发生什么?

当执行程序崩溃时会发生什么?如果Worker发生崩溃,将通知Master,并停止为该Worker分配新的处理任务。崩溃的Worker上的执行器将被停止,AppMaster将采取与执行器崩溃时相同的处理措施。

当AppMaster崩溃时会发生什么?

如果AppMaster崩溃,Master将从其他地方获取资源来生成新的AppMaster并启动。启动后,AppMaster将进行应用程序的恢复处理。在流处理中,从磁盘加载最新的最小时钟和其他状态,并向Master发送资源请求以启动Executor,并使用获取的资源以最小时钟加载任务组来进行恢复。

当执行程序崩溃时会发生什么?

如果执行器崩溃,将通知上级AppMaster,并从Master接收用于启动新执行器的资源,然后重新分配崩溃执行器正在执行的任务。

任务崩溃时会发生什么?

如果任务抛出异常,将通知上一级执行器,并重新启动任务。

如果启用了“至少一次”模式,那么将启动由于消息丢失而导致的重新执行。首先,AppMaster从全局时钟服务(在全局时钟服务崩溃时从ClockStorage中获取)获取最新的最小时钟,并重新启动所有的TaskActor,同时附带新的任务状态。结果是,数据源和任务组将使用指定的最小时钟对消息进行重新处理。

AppMaster执行的重新启动可能会导致任务故障,但这样做的影响似乎很大。然而,由于其处理速度很快,尽管发生故障会产生较大影响,但采取迅速恢复的方法可能更为合适。

工作是如何进行的?

在某些应用中,“仅处理一次”消息处理模型非常重要。例如,在实时支付系统中,不可重复进行支付。 “仅处理一次”消息处理模型的目标是确保“不积累错误,今天的错误也不会积累到明天。”

当向应用程序开发者解释时,我们使用全局时钟来同步分布式事务。我们确保从数据源获取的所有消息都具有唯一的时间戳。这个时间戳要么作为消息正文的一部分保存,要么在将其投入流式处理系统时,根据系统时钟来赋予它。通过使用这个全局同步时钟,我们能够协调所有任务使用相同的时间戳。

checkpointing.png

获取状态检查点的流程:

    1. 协调员通知Streaming系统,在时间戳Tc处获取检查点。

 

    1. 每个任务在应用程序中保持”检查点状态”和”当前状态”这两个值。”检查点状态”仅保留上一次时间戳Tc的值,而”当前状态”则保留着所有信息。

 

    1. 如果”全局最小时钟”大于Tc,则表示所有具有早于Tc值的消息已被处理。由于”检查点状态”发生变化,因此在修改后将其保存到存储中。

 

    1. 如果发生消息丢失,则启动恢复过程。

 

    1. 为了恢复,从存储器中读取最新的”检查点状态”并设置为应用程序的状态。

 

    数据源从”检查点时间戳”重新获取消息并执行。

“检查点获取间隔由全局时钟服务动态决定。每个数据源记录获取的消息的最大时间戳。当接收到最小时钟更新消息时,数据源将与最大时间戳的差异返回给全局时钟服务。最大时间戳差异表示应用程序状态间隔的上限。检查点获取间隔将大于最大时间戳差异。”

checkpoint_equation.png
checkpointing_interval.png

全局时钟服务向每个任务发送检查点获取间隔通知后,每个任务在不等待整体同步的情况下计算下一个检查点的时间戳。

checkpoint_interval_equation.png

每个任务都保持”检查点状态”和”当前状态”,但是状态值会像下面的代码一样被更新。

TaskState(stateStore, initialTimeStamp):
  currentState = stateStore.load(initialTimeStamp)
  checkpointState = currentState.clone
  checkpointTimestamp = nextCheckpointTimeStamp(initialTimeStamp)

onMessage(msg):
  if (msg.timestamp < checkpointTimestamp):
    checkpointState.updateMessage(msg)
  currentState.updateMessage(msg)  
  maxClock = max(maxClock, msg.timeStamp)

onMinClock(minClock):
  if (minClock > checkpointTimestamp):
    stateStore.persist(checkpointState)
    checkpointTimeStamp = nextCheckpointTimeStamp(maxClock)
    checkpointState = currentState.clone

onNewCheckpointInterval(newStep):
  step = newStep  

nextCheckpointTimeStamp(timestamp):
  checkpointTimestamp = (1 + timestamp/step) * step

列表1:任务事务状态实现

从外观上看,由于消息的时间戳限制了消息的唯一性,而且通过实现Monoid可以将状态按照时间戳保持,Gearpump本身就具备了去除重复值的机制,从而满足Exactly once的要求。然而,Gearpump本身只是一个执行引擎,需要将结果输出到其他系统。一旦需要这样做,就不再满足Exactly once的要求了。如果在Interval期间Executor崩溃,Executor中已经发送到外部的任务的消息将无法追踪。然而,基于时间戳的唯一性特性,去除重复消息应该是一个较容易实现的架构。

什么是动态图,以及它的工作原理是什么? shì tú, tā de shì ?)

DAG 可以进行动态更新。可以动态地添加、删除和更新图元素。

dynamic.png

至少确保消息传递和Kafka成功运作一次。

您可以在以下位置找到Kafka源代码示例项目和教程:
– Kafka连接器示例项目
– 与Kafka源进行连接

在本文件中,我们将描述如何实现”至少一次”模型。

如果将WordCountExample表示为图表,如下所示。源代码树。 (Pinyin: WordCountExample , de . shù.)

卡夫卡的WordCount有向无环图是什么样子的?

kafka_wordcount.png
    • KafkaStreamProducer(or KafkaSource)がKafka queueからメッセージを取得

 

    • Splitが文章を単語に分割

 

    Sumが各単語の出現数を集計

如何从Kafka读取数据

使用KafkaSource进行连接。连接方法请参考下方地址。

需要注意的是,将StartTimestamp设置为KafkaSource,并在从Kafka队列中获取消息时,从保持此Timestamp值以上的最小Timestamp消息开始获取。

在任务崩溃或消息丢失的情况下会发生什么?

如果发生了消息丢失,AppMaster会停止Global Clock Service,并固定Global minimum timestamp。然后,重新启动KafkaSource任务时,KafkaSource会从AppMaster获取Global minimum timestamp,并用于开始获取消息。

KafkaSource 是如何使用哪种方法从起始时间戳读取消息的呢?正如我们所了解的,Kafka队列不会暴露时间戳信息。

Kafka队列只公开与每个Partition相关的偏移信息。KafkaSource会对Kafka偏移和应用程序时间戳进行映射和保存,因此可以根据时间戳指定Kafka偏移来获取消息,并且可以重新从Kafka获取消息。

应用程序时间戳和偏移的映射可以存储为分布式文件系统,或作为Kafka主题进行保留。

最后

我已经阅读了Gearpump的概述文档到这个地方,关于最让我关注的Exactly once问题,根据时间戳的分配方式,Gearpump内部似乎可以保持状态并确认这一点。然而,由于可能会出现重复处理的情况,需要在结果存储位置进行一些重复删除处理,这是目前的想法。不过,由于可以使用时间戳来唯一识别消息,在存储位置上也可以很容易地创建重复删除机制。

因为在这里已经揭示了Gearpump的概述,所以下一步我们将进入实际操作的步骤,看看它是什么样子。

广告
将在 10 秒后关闭
bannerAds