使用Amazon MSK来触发Lambda函数,并使用SASL/SCRAM认证进行执行

亚马逊MSK(Managed Streaming for Apache Kafka)是什么?

Amazon MSK是由AWS提供的Apache Kafka的托管服务。

我们提供Apache Kafka集群的托管服务。只需指定broker数量和Kafka版本,即可进行构建。由于是托管服务,无需担心执行环境等,这是一个优点。通过选择VPC内的子网,可以进行部署,但至少需要两个子网才能构建,因此默认为多可用区(Multi-AZ)。

以下是对公式的解释。

亚马逊 MSK 是一种完全托管的服务,可轻松构建和运行使用 Apache Kafka 处理流式数据的应用程序。Apache Kafka 是一个开源平台,用于构建实时流式数据管道和应用程序。在亚马逊 MSK 中,可以使用原生的 Apache Kafka API,将数据输入到数据湖、流式传输数据变更与数据库之间,并增强机器学习和分析应用程序。

请参考以下有关Apache Kafka的内容。

关于Lambda和MSK的协作

可以选择使用Amazon MSK(以下简称MSK)作为Lambda执行的触发器。我们将尝试将EC2作为客户端向MSK发送消息,并使用SASL/SCRAM认证来执行Lambda。

undefined

此处前提是……

    • クライアントEC2:javaインストール済み、kafkaダウンロード済み(MSKで指定したバージョンをダウンロードしてください。)

 

    • MSK:アクセスコントロール方法→SASL/SCRAM認証,Version2.6.2,SGはEC2からの通信を許可

 

    Lambda:NatGateway経由でアクセス1

构成的本身是根据以下参考的。

在 SecretsManager 中创建用户名/密码并连接到 Cluster。

本次使用以下步骤完成客户端身份验证,采用SASL/SCRAM认证。因此,首先在SecretsManager中创建密钥。

使用SecretsManager创建用于客户端认证的用户名/密码。

image.png
{
  "username": "alice",
  "password": "alice-secret"
}

在创建密钥后,我们将通过SSH连接到EC2并进行身份验证。
首先,将密钥与集群关联起来。

$ aws kafka batch-associate-scram-secret \
    --cluster-arn <Clusterのarn> \
    --secret-arn-list <上記シークレットのarn> 

下面,我们将创建一个新的主题。
请根据您的环境适当修改replication-factor等参数。

$ aws kafka describe-cluster \ 
    --region ap-northeast-1 \
    --cluster-arn <Clusterのarn>
> ...
 "ZookeeperConnectString": "<ZookeeperConnectString>"
  ...

$ cd kafka_2.12-2.6.2/bin

$ ./kafka-topics.sh \
    --create \
    --zookeeper  "<ZookeeperConnectString>"\
    --replication-factor 2 \
    --partitions 1 \
    --topic TestTopic
> Created topic TestTopic.

请创建并导出配置文件(users_jaas.conf)。
请将用户名和密码替换为在SecretsManager中设置的值。

$ vim users_jaas.conf

$ export KAFKA_OPTS=-Djava.security.auth.login.config=users_jaas.conf
KafkaClient {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="alice"
   password="alice-secret";
};

将JDK密钥库文件从JVM复制到/tmp目录中。
请将替换为您自己环境中的值。

$ ls /usr/lib/jvm/
> java-1.8.0-openjdk-...
...
$ cp /usr/lib/jvm/<JDKFolder>/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks

创建属性文件

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks

最后获取BootstrapBrokerString并向MSK发送适当的消息。

$ aws kafka get-bootstrap-brokers \
    --region ap-northeast-1 \
    --cluster-arn <Clusterのarn>
> 
{
    "BootstrapBrokerStringSaslScram": "<BootstrapBrokerStringSaslScram>"
}

$ ./kafka-console-producer.sh \
    --broker-list <BootstrapBrokerStringSaslScram> \
    --topic TestTopic \
    --producer.config client_sasl.properties
> ここにメッセージを入力できます。

将MSK设置为Lambda的触发器。

接下来,将MSK设置为Lambda的触发器。
使用AWS CLI进行设置。

$ aws lambda create-event-source-mapping \
    --event-source-arn  <Clusterのarn> \
    --topics TestTopic \
    --source-access-configurations Type=SASL_SCRAM_512_AUTH,URI=<シークレットのURI> \
    --starting-position LATEST \
    --function-name <LambdaのFunction名>

当设置完成后,您可以向制作人发送消息…

$ ./kafka-console-producer.sh \
    --broker-list <BootstrapBrokerStringSaslScram> \
    --topic TestTopic \
    --producer.config client_sasl.properties
> SampleMessage
>

Lambda将会运行。

2021-09-09T17:38:30.028+09:00   START RequestId: XXX Version: $LATEST
2021-09-09T17:38:30.031+09:00   2021-09-09T08:38:30.031Z XXX INFO MSKから来たMessage: SampleMessage
2021-09-09T17:38:30.032+09:00   END RequestId: XXX
2021-09-09T17:38:30.032+09:00   REPORT RequestId: XXX Duration: 1.20 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 65 MB

如果Lambda没有执行,请再次检查与Lambda相关的安全组等配置。
(我的安全组配置有误,Lambda的触发器栏中出现了“问题: 连接错误,请检查事件源连接配置。”的消息。)

PrivateLink也可以使用。更多详情请查看这里。
广告
将在 10 秒后关闭
bannerAds