我曾经在AWS上构建了一个流式数据收集系统的故事

这篇文章是2022年アイスタイル冒险日历的第17天的文章。
初次见面,我是shirakih,现在是在アイスタイル数据分析系统部门担任三年级的新入职数据工程师。
平时,我负责开发和维护SaaS型营销支持服务【品牌官网】的数据基础设施。

这次我们要讲一下@grassy在第九天的文章中所写的关于数据基础设施云迁移相关的流式数据收集系统迁移到AWS的故事。为了构建一个没有数据丢失的配置,我将介绍实际进行的一些工作。如果你对品牌官方的详细信息或数据基础设施的云迁移感兴趣,请查看@grassy的以下文章中详细说明。

 

有关转移前的配置

image.png

■处理的步骤

    1. Go语言构建的消费者从RabbitMQ接收消息,

 

    1. 将消息推送到Kafka中,

 

    1. 在Kafka Streams处理中通过API补充消息缺少的信息,

 

    使用HDFS Sink Connector将消息同步到HDFS中。

接收消息-通过API补充缺失的信息-保存的过程。
正如前述,我们将品牌官方的数据基础设施从本地迁移到云端(GCP)。因此,数据收集源需要从HDFS更改为BigQuery,相应地,该系统也需要迁移。

关于AWS迁移后的配置

我们选择AWS而不是GCP作为系统迁移目标的原因是,我们计划将许多内部系统迁移到AWS(只有数据基础设施迁移到GCP),因此我们预计与本次相关的RabbitMQ和内部API也将迁移到AWS。因此,为了最小化云之间(AWS⇔GCP)的数据传输,我们选择了AWS,因为在GCP上建立系统会导致需要在云之间进行来回传输。至于编程语言,由于品牌官方的数据基础设施云迁移选择了Python,所以我们也选择了Python作为本系统的编程语言。

image.png

■处理流程

    RabbitMQ的消息由使用ECS和Fargate构建的消费者接收,然后将消息发送到Kinesis数据流中转给Lambda。Lambda内部的处理使用API补充消息中缺失的信息。然后,Kinesis数据输送将消息传送给S3,最后使用BigQuery数据传输服务将其传送到BigQuery。

消费者在ECS·Fargate上进行构建,并使用Kinesis+Lambda来构建之前由Kafka扮演的角色,并最终将其转移到S3中。向BigQuery的传输是通过Data Transfer Service每天传输的(重新尝试机制将在后文详述)。

由於本系統最終目標是向品牌提供資料服務,因此資料品質的保證是一個重要的課題。缺失的資料是不可接受的,重複的資料將在轉送至BigQuery的過程中被排除,我們設計並建構此系統以滿足至少一次的要求(允許重複資料但不允許缺失資料)。

为了确保数据完整性,构建一个不缺失任何数据的配置。

以下是可能发生数据丢失或重复的可能地点。

image.png

我将介绍针对每个问题所采取的实际措施,共有四点。

在RabbitMQ与Fargate之间的通信中,利用ack响应。

以下是在本次与RabbitMQ进行通信应对策中,我们采用了RabbitMQ与Fargate之间通信时利用ack応答的措施。
由于RabbitMQ与Fargate之间受到各种因素的影响,可能会出现消息丢失或重复的情况。
因此,在RabbitMQ的Consumer实现中,我们使用了不在没有收到ack応答前就不删除消息的机制。
在本次的实现中,当成功将消息转发到Kinesis Data Streams时,会将ack応答发送回RabbitMQ。
以下是一个示例,Consumer的实现使用了pika,与AWS的通信使用了boto3。

# boto3を用いてData Streamsへ転送を行い、HTTPStatusCodeを返す
status = datastreams_handler(json.loads(body.decode()), self._queue_name)
if status == 200:
    # Kinesis Data Streamsへの転送が成功すれば、Rabbit MQにack応答を返す 
    basic_ack(basic_deliver.delivery_tag)
else:
    logger.error('Exception Data Streams: %s Error', status)
    # コンシューミングの停止、再接続
    # 省略

如果传输成功,则可以通过basic_ack将ack返回给Rabbit MQ。如果传输失败,则停止消费并进行重试。
通过这种实现,我们能够消除Rabbit MQ和Fargate之间的数据丢失。

建立重试机制

image.png

■处理过程

(Chinese:

    1. API通信的回应出现错误

 

    1. 通过SQS将目标消息传输到临时存储S3

 

    1. 每小时启动重试用Lambda,再次进行API通信

成功时:传输至Kinesis Data Firehose
失败时:将目标消息传输到临时存储S3(最多3次)

通过这种方式,即使在发生错误时无法与API通信,也可以最多重试3次,从而在API通信中尽可能减少丢失。

引入Kinesis Data Streams、Kinesis Data Firehose和SQS。

作为防止AWS服务之间通信的措施,我们引入了Kinesis Data Streams、Kinesis Data Firehose和SQS作为第三个选择。
我们通过使用确保至少一次性的Kinesis Data Streams、Kinesis Data Firehose和SQS,而不是直接从Fargate传输到Lambda,再从Lambda传输到S3等方式,可以确保数据不会丢失,并且可以方便地进行重试,从而构建一个易于操作的架构。

当Lambda发生错误时的处理

以下是关于Lambda错误处理的第四个问题。
由于Lambda的处理时间为15分钟,因此需要考虑到处理超时的情况。如果不考虑处理超时,可能会导致中途数据的丢失。
因此,这次我们决定在处理开始后的14分钟,也就是超时的前1分钟,将消息转发到重试机制。通过这种方式,可以预先避免处理的中断,从而避免数据丢失的发生。

另外,我们还将以下内容包含在Lambda的响应中,作为其他异常错误的处理。

return {
        'batchItemFailures': [
            {
                'itemIdentifier': record['kinesis']['sequenceNumber']
            }
        ]
    }

Lambda在响应中判断批处理的成功或失败。通过在batchItemFailures中包含记录的序列号,可以判断批处理是否失败,并根据序列号重新尝试Lambda。这样,即使批处理由于异常而失败,也可以构建一个没有缺失的配置。

总结

希望这次关于在AWS上构建流式采集系统的介绍,重点是如何防止数据丢失。
如果在构建类似的系统时能有所帮助,那就太好了!
祝大家圣诞快乐,新年快乐!

广告
将在 10 秒后关闭
bannerAds