使用Python模块confluent-kafka-python来实现将消息发布到Azure事件中心的方法

本文将解释如何使用Python模块confluent-kafka-python来在Azure Event Hubs上实现发布/订阅功能。

大多数情况下,使用azure-eventhub模块可以在Python中使用Kafka协议从Azure Event Hubs进行发布/订阅操作,但是如果需要与除Azure Event Hubs之外的Kafka代理进行发布/订阅操作,则需要编写通用代码进行调查并创建。

安装模块

在事前准备阶段,需要进行模块的安装。

pip install confluent-kafka
pip install certifi

以Apache Kafka为目标的发布/订阅

首先,介绍一下将消息发布/订阅到Apache Kafka的方法。

以Apache Kafka为目标进行发布

请将以下常量替换为适合您自己的环境。

    • HOST

 

    TOPIC
from confluent_kafka import Producer

# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'

# Apache Kafka の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-producer'
}

# Apache Kafka へ Publish
kafka = Producer(conf)
message = 'hello'
try:
    kafka.produce(topic = TOPIC, value = message)
finally:
    kafka.flush()

订阅来自Apache Kafka的消息

请将下面的常数替换为各自的环境。

    • HOST

 

    TOPIC
from confluent_kafka import Consumer

# Apache Kafka のパラメータ設定
HOST = '192.168.0.81:9093'
TOPIC = 'test'
GROUP = 'foo'

# Apache Kafka の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-consumer'
    , 'group.id': GROUP
    , 'auto.offset.reset': 'smallest'
}

# Apache Kafka から Subscribe
kafka = Consumer(conf)
try:
    kafka.subscribe([TOPIC])
    msg_count = 0
    while True:
        msg = kafka.poll(timeout = 1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
    kafka.close()

将消息发布/订阅到Azure事件中心

下一步是介绍如何将消息发布到Azure Event Hubs。

发布到 Azure 事件中心

请将下面的常数替换为您自己的环境。

    • HOST

 

    • TOPIC

 

    CONNECTION_STRING
from confluent_kafka import Producer
import certifi

# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'


# Azure Event Hubs の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-producer'
    , 'security.protocol': 'SASL_SSL'
    , 'sasl.mechanism': 'PLAIN'
    , 'sasl.username': '$ConnectionString'
    , 'sasl.password': CONNECTION_STRING
    , 'ssl.ca.location': certifi.where()
}

# Azure Event Hubs へ Publish
kafka = Producer(conf)
message = 'hello'
try:
    kafka.produce(topic = TOPIC, value = message)
finally:
    kafka.flush()

订阅来自Azure Event Hubs

请将以下常量替换为自己的环境。

    • HOST

 

    • TOPIC

 

    • CONNECTION_STRING

 

    GROUP
from confluent_kafka import Consumer
import certifi

# Azure Event Hubs のパラメータ設定
HOST = '[ネームスペース名].servicebus.windows.net:9093'
TOPIC = 'test'
CONNECTION_STRING = 'Endpoint=sb://[ネームスペース名].servicebus.windows.net/;SharedAccessKeyName=[共有アクセスポリシー名];SharedAccessKey=[共有アクセスポリシーの主キー]'
GROUP = 'foo'

# Azure Event Hubs の接続設定
conf = {
    'bootstrap.servers': HOST
    , 'client.id': 'python-example-consumer'
    , 'security.protocol': 'SASL_SSL'
    , 'sasl.mechanism': 'PLAIN'
    , 'sasl.username': '$ConnectionString'
    , 'sasl.password': CONNECTION_STRING
    , 'ssl.ca.location': certifi.where()
    , 'group.id': GROUP
    , 'auto.offset.reset': 'smallest'
}

# Azure Event Hubs から Subscribe
kafka = Consumer(conf)
try:
    kafka.subscribe([TOPIC])
    msg_count = 0
    while True:
        msg = kafka.poll(timeout = 1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        print('Received message: {}'.format(msg.value().decode('utf-8')))
finally:
    kafka.close()

如何确认 Azure Event Hubs 的连接字符串

可以在Azure Portal中查看Azure Event Hubs的连接字符串。

    1. 点击共享访问策略。

 

    1. 点击共享访问策略名称(默认为RootManageSharedAccessKey)。

 

    连接字符串-主密钥将成为连接到Azure Event Hubs所需的连接字符串。
Qiita_azure_shared-acces-key.png
掲載的屏幕截图中的每个按键等已经无法使用,但出于易懂考虑,我们没有进行隐藏而是进行了发布。

文献引用

我参考了以下网站。

 

广告
将在 10 秒后关闭
bannerAds