使用Amazon MSK来触发Lambda函数,并使用SASL/SCRAM认证进行执行
亚马逊MSK(Managed Streaming for Apache Kafka)是什么?
Amazon MSK是由AWS提供的Apache Kafka的托管服务。
我们提供Apache Kafka集群的托管服务。只需指定broker数量和Kafka版本,即可进行构建。由于是托管服务,无需担心执行环境等,这是一个优点。通过选择VPC内的子网,可以进行部署,但至少需要两个子网才能构建,因此默认为多可用区(Multi-AZ)。
以下是对公式的解释。
亚马逊 MSK 是一种完全托管的服务,可轻松构建和运行使用 Apache Kafka 处理流式数据的应用程序。Apache Kafka 是一个开源平台,用于构建实时流式数据管道和应用程序。在亚马逊 MSK 中,可以使用原生的 Apache Kafka API,将数据输入到数据湖、流式传输数据变更与数据库之间,并增强机器学习和分析应用程序。
请参考以下有关Apache Kafka的内容。
关于Lambda和MSK的协作
可以选择使用Amazon MSK(以下简称MSK)作为Lambda执行的触发器。我们将尝试将EC2作为客户端向MSK发送消息,并使用SASL/SCRAM认证来执行Lambda。
此处前提是……
-
- クライアントEC2:javaインストール済み、kafkaダウンロード済み(MSKで指定したバージョンをダウンロードしてください。)
-
- MSK:アクセスコントロール方法→SASL/SCRAM認証,Version2.6.2,SGはEC2からの通信を許可
- Lambda:NatGateway経由でアクセス1
构成的本身是根据以下参考的。
在 SecretsManager 中创建用户名/密码并连接到 Cluster。
本次使用以下步骤完成客户端身份验证,采用SASL/SCRAM认证。因此,首先在SecretsManager中创建密钥。
使用SecretsManager创建用于客户端认证的用户名/密码。
{
"username": "alice",
"password": "alice-secret"
}
在创建密钥后,我们将通过SSH连接到EC2并进行身份验证。
首先,将密钥与集群关联起来。
$ aws kafka batch-associate-scram-secret \
--cluster-arn <Clusterのarn> \
--secret-arn-list <上記シークレットのarn>
下面,我们将创建一个新的主题。
请根据您的环境适当修改replication-factor等参数。
$ aws kafka describe-cluster \
--region ap-northeast-1 \
--cluster-arn <Clusterのarn>
> ...
"ZookeeperConnectString": "<ZookeeperConnectString>"
...
$ cd kafka_2.12-2.6.2/bin
$ ./kafka-topics.sh \
--create \
--zookeeper "<ZookeeperConnectString>"\
--replication-factor 2 \
--partitions 1 \
--topic TestTopic
> Created topic TestTopic.
请创建并导出配置文件(users_jaas.conf)。
请将用户名和密码替换为在SecretsManager中设置的值。
$ vim users_jaas.conf
$ export KAFKA_OPTS=-Djava.security.auth.login.config=users_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};
将JDK密钥库文件从JVM复制到/tmp目录中。
请将替换为您自己环境中的值。
$ ls /usr/lib/jvm/
> java-1.8.0-openjdk-...
...
$ cp /usr/lib/jvm/<JDKFolder>/jre/lib/security/cacerts /tmp/kafka.client.truststore.jks
创建属性文件
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
最后获取BootstrapBrokerString并向MSK发送适当的消息。
$ aws kafka get-bootstrap-brokers \
--region ap-northeast-1 \
--cluster-arn <Clusterのarn>
>
{
"BootstrapBrokerStringSaslScram": "<BootstrapBrokerStringSaslScram>"
}
$ ./kafka-console-producer.sh \
--broker-list <BootstrapBrokerStringSaslScram> \
--topic TestTopic \
--producer.config client_sasl.properties
> ここにメッセージを入力できます。
将MSK设置为Lambda的触发器。
接下来,将MSK设置为Lambda的触发器。
使用AWS CLI进行设置。
$ aws lambda create-event-source-mapping \
--event-source-arn <Clusterのarn> \
--topics TestTopic \
--source-access-configurations Type=SASL_SCRAM_512_AUTH,URI=<シークレットのURI> \
--starting-position LATEST \
--function-name <LambdaのFunction名>
当设置完成后,您可以向制作人发送消息…
$ ./kafka-console-producer.sh \
--broker-list <BootstrapBrokerStringSaslScram> \
--topic TestTopic \
--producer.config client_sasl.properties
> SampleMessage
>
Lambda将会运行。
2021-09-09T17:38:30.028+09:00 START RequestId: XXX Version: $LATEST
2021-09-09T17:38:30.031+09:00 2021-09-09T08:38:30.031Z XXX INFO MSKから来たMessage: SampleMessage
2021-09-09T17:38:30.032+09:00 END RequestId: XXX
2021-09-09T17:38:30.032+09:00 REPORT RequestId: XXX Duration: 1.20 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 65 MB
如果Lambda没有执行,请再次检查与Lambda相关的安全组等配置。
(我的安全组配置有误,Lambda的触发器栏中出现了“问题: 连接错误,请检查事件源连接配置。”的消息。)