【重:Invent 2022】尝试使用EventBridge Pipes进行DDB→Lambda操作

首先

这篇文章是2022年Mirogos圣诞日历的第二天文章。

由于参与方式是在线参与,因此我只能观看Keynote演讲和领导人会议。由于与Adcarle负责日期和Werner Vogels的主题演讲日重叠,我选择了其中一些我感兴趣的内容进行了了解。

在主题演讲的开头,从延迟和吞吐量的讨论过渡到异步讨论,事件驱动架构成为前半部分的主要话题。在事件驱动中,主要角色是EventBridge和Step Functions周围的新服务的发布,个人而言,有很多非常有趣的更新。

image.png
    • Amazon EventBridge Pipes

What’s New: Amazon EventBridge Pipes is now generally available

AWS News Blog: New — Create Point-to-Point Integrations Between Event Producers and Consumers with Amazon EventBridge Pipes

Step Functions Distributed Map

What’s New: AWS Step Functions launches large-scale parallel workflows for data processing and serverless applications

AWS News Blog: Step Functions Distributed Map – A Serverless Solution for Large-Scale Parallel Data Processing

現時点ではプレビューなサービス

Amazon CodeCatalyst

What’s New: Announcing Amazon CodeCatalyst (Preview)

AWS News Blog: Announcing Amazon CodeCatalyst (preview), a Unified Software Development Service

AWS Application Composer。

What’s New: Introducing AWS Application Composer (Preview)

AWS News Blog: Visualize and create your serverless workloads with AWS Application Composer

AWSアーキテクチャをGUIで設計でき、SAMを使ってデプロイができるようです。

在这里,我打算继续写有关EventBridge Pipes 的内容。

事件桥管道

EventBridge是一个服务之间的桥梁,它通过将从生产者服务传递的事件按照规则中定义的事件模式进行过滤,然后传递给事件消费者。

EventBridge Pipes也进行生产者和消费者之间的桥梁,但是通过添加过滤和增强功能,似乎可以将要传递给消费者的事件进行转换。

根据2022年12月2日在AWS控制台上确认的信息,每个组件似乎都支持以下服务。

    • イベントソース

Kinesis
SQS
DynamoDB
MQ
MSK
self-managed Kafka

Enrichment

Lambda
Step Functions
API Gateway
API Destinations

ターゲットサービス

API Gateway
API Destinations
Lambda
Cloudwatch Logs
ECS
EventBridge
Kinesis Data Firehose
Inspector
Kinesis Data Streams
Redshift
SageMaker Pipeline
SNS
SQS
Step Functions
Batch

尝试制作Pipes

我会在AWS控制台上试着操作一下。

根据DynamoDB中的事件流,使用Filtering将NewImage属性限定为具有id和eventName不是Delete的事件。然后,指定相同的Lambda函数用于Enrichment和目标源,以便可以分别查看每个日志。

image.png

被称为Lambda的是以下的程序。
我们会检查Enrichment接收的event变量,以及通过Enrichment传递给目标源Lambda的值,并通过event变量进行确认。

import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(event)
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda through ignore DELETE !')
    }

事件源

在事件源中指定DynamoDB。
请注意,DynamoDB必须是DynamoDB Stream。
(请谨慎选择,因为在选择后无法看到表名。)

image.png

这次,我们只将String类型的id作为DynamoDB的Partition key设置。

过滤

过滤将成为一个选项。

在控制台上,您可以将EventBridge Pipes作为整体的UI来查看事件源的示例。
因此,您可以根据您预期的事件源示例来设置过滤器。

image.png

因为设有如下Filtering设置,所以除非DELETE以外的情况,否则在删除Item时后面的Lambda函数不会被执行。

{
  "dynamodb": {
    "NewImage": {
      "id": {
        "S": [{
          "exists": true
        }]
      }
    }
  },
  "eventName": [{
    "anything-but": "DELETE"
  }]
}

另外,过滤的示例如下所示。

    Amazon EventBridge イベントパターンでのコンテンツのフィルタリング

丰富化

就像过滤器一样,这也是一个可选功能。

我正在接收受到筛选限制的事件,并将该事件的JSON记录到日志中,然后返回一个固定的字符串。

image.png

目标服务

让我们来检查一下从Enrichment传递过来的事件。

image.png

尝试将其放入管道中流动

    我来试着在DynamoDB中添加一个Item。

我尝试将id命名为First_hoge。

image.png

2. 查看Lambda的执行日志。

image.png

由于Enrichment和目标服务在同一个Lambda中,所以CloudWatch Logs的日志也会出现在同一个日志流中。
蓝框表示Enrichment的日志,红框表示目标服务的日志。

Enrichment的日志以以下形式呈现。(以JSON格式进行格式化。)
与DynamoDB Stream相似。

[
  {
    "eventID": "e6575fc2c02eb96495d6c3d04a941d8f",
    "eventName": "INSERT",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "ap-northeast-1",
    "dynamodb": {
      "ApproximateCreationDateTime": 1669946402,
      "Keys": { "id": { "S": "First_hoge" } },
      "NewImage": { "id": { "S": "First_hoge" } },
      "SequenceNumber": "13700000000003743678089",
      "SizeBytes": 24,
      "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:0000000000000:table/Pipe-test-table/stream/2022-12-02T01:16:40.686"
  }
]

目标服务的日志似乎直接接收Enrichment处理后返回的JSON。

[{'statusCode': 200, 'body': '"Hello from Lambda through ignore DELETE !"'}]

总结

我尝试使用 EventBridge Pipes 这些在 re:Invent 上发布的新服务。在我目前运行的产品中,我将事件从 DynamoDB Stream 流式传输到 Lambda,并在 Lambda 中进行条件分支的实现。但是使用 Pipes 的话,代码量可以大大减少。

广告
将在 10 秒后关闭
bannerAds