使用kafka-python将消费者的偏移量复制到其他位置

在实现Kafka Consumer时,通常会使用像Java或Scala这样的语言,但如果不是太复杂的话,我想使用轻量级的Python来实现。
这次我想使用适用于Python的Apache Kafka库kafka-python来编写一段维护程序,将一个Consumer的偏移量转移到另一个Consumer上。

获取消费者的偏移量

本次使用KafkaAdminClient。虽然可以从KafkaConsumer中获取,但是由于订阅消费者和轮询操作都比较繁琐,所以推荐使用这个客户端。

from kafka import KafkaAdminClient

# consumer01はtopic01を購読中という想定
target_consumer_name = "consumer01"

# KafkaAdminClientを取得
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# オフセットを取得
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)

返回结果可能如下所示,用TopicPartition类作为偏移量的键来表示。

{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
 TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}

将偏移值写入其他消费者

为了修改consumer,使用KafkaConsumer。
将之前获取的偏移量信息写入consumer02。
如果consumer不存在,则会自动创建。

from kafka import KafkaConsumer

consumer_group_name = 'consumer02'

consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=False)

# offsetの情報を書き込み(consumerが存在しない場合作成される)
consumer.commit(offsets)

用consumer02写入了与consumer01相同主题的偏移信息。
如果使用consumer02开始订阅,应该从与consumer01获取偏移相同的地方开始订阅。

购买订阅的示例 de

consumer_group_name='consumer02'

# consumerを取得
consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers)

# topic01を購読する設定に変更
consumer.subscribe(topics=['topic01'])

# topic01 consumer02に実際に参加
consumer.poll()

# 受信できたメッセージをprintし続ける
for msg in consumer:
    print(msg)

请参考。

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

以下是相关网址:
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

广告
将在 10 秒后关闭
bannerAds