使用Python模块confluent-kafka-python来实现将消息发布到Azure事件中心的方法
本文将解释如何使用Python模块confluent-kafka-python来在Azure Event Hubs上实现发布/订阅功能。
大多数情况下,使用azure-eventhub模块可以在Python中使用Kafka协议从Azure Event Hubs进行发布/订阅操作,但是如果需要与除Azure Event Hubs之外的Kafka代理进行发布/订阅操作,则需要编写通用代码进行调查并创建。
安装模块
在事前准备阶段,需要进行模块的安装。
pip install confluent-kafka
pip install certifi
以Apache Kafka为目标的发布/订阅
首先,介绍一下将消息发布/订阅到Apache Kafka的方法。
以Apache Kafka为目标进行发布
请将以下常量替换为适合您自己的环境。
-
- HOST
- TOPIC
from confluent_kafka import Producer
# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'
# Apache Kafka の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-producer'
}
# Apache Kafka へ Publish
kafka = Producer(conf)
message = 'hello'
try:
kafka.produce(topic = TOPIC, value = message)
finally:
kafka.flush()
订阅来自Apache Kafka的消息
请将下面的常数替换为各自的环境。
-
- HOST
- TOPIC
from confluent_kafka import Consumer
# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'
GROUP = 'foo'
# Apache Kafka の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-consumer'
, 'group.id': GROUP
, 'auto.offset.reset': 'smallest'
}
# Apache Kafka から Subscribe
kafka = Consumer(conf)
try:
kafka.subscribe([TOPIC])
msg_count = 0
while True:
msg = kafka.poll(timeout = 1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
kafka.close()
将消息发布/订阅到Azure事件中心
下一步是介绍如何将消息发布到Azure Event Hubs。
发布到 Azure 事件中心
请将下面的常数替换为您自己的环境。
-
- HOST
-
- TOPIC
- CONNECTION_STRING
from confluent_kafka import Producer
import certifi
# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'
# Azure Event Hubs の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-producer'
, 'security.protocol': 'SASL_SSL'
, 'sasl.mechanism': 'PLAIN'
, 'sasl.username': '$ConnectionString'
, 'sasl.password': CONNECTION_STRING
, 'ssl.ca.location': certifi.where()
}
# Azure Event Hubs へ Publish
kafka = Producer(conf)
message = 'hello'
try:
kafka.produce(topic = TOPIC, value = message)
finally:
kafka.flush()
订阅来自Azure Event Hubs
请将以下常量替换为自己的环境。
-
- HOST
-
- TOPIC
-
- CONNECTION_STRING
- GROUP
from confluent_kafka import Consumer
import certifi
# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'
GROUP = 'foo'
# Azure Event Hubs の接続設定
conf = {
'bootstrap.servers': HOST
, 'client.id': 'python-example-consumer'
, 'security.protocol': 'SASL_SSL'
, 'sasl.mechanism': 'PLAIN'
, 'sasl.username': '$ConnectionString'
, 'sasl.password': CONNECTION_STRING
, 'ssl.ca.location': certifi.where()
, 'group.id': GROUP
, 'auto.offset.reset': 'smallest'
}
# Azure Event Hubs から Subscribe
kafka = Consumer(conf)
try:
kafka.subscribe([TOPIC])
msg_count = 0
while True:
msg = kafka.poll(timeout = 1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
kafka.close()
如何确认 Azure Event Hubs 的连接字符串
可以在Azure Portal中查看Azure Event Hubs的连接字符串。
-
- 点击共享访问策略。
-
- 点击共享访问策略名称(默认为RootManageSharedAccessKey)。
- 连接字符串-主密钥将成为连接到Azure Event Hubs所需的连接字符串。
掲載的屏幕截图中的每个按键等已经无法使用,但出于易懂考虑,我们没有进行隐藏而是进行了发布。
文献引用
我参考了以下网站。