使用kafka-python将消费者的偏移量复制到其他位置
在实现Kafka Consumer时,通常会使用像Java或Scala这样的语言,但如果不是太复杂的话,我想使用轻量级的Python来实现。
这次我想使用适用于Python的Apache Kafka库kafka-python来编写一段维护程序,将一个Consumer的偏移量转移到另一个Consumer上。
获取消费者的偏移量
本次使用KafkaAdminClient。虽然可以从KafkaConsumer中获取,但是由于订阅消费者和轮询操作都比较繁琐,所以推荐使用这个客户端。
from kafka import KafkaAdminClient
# consumer01はtopic01を購読中という想定
target_consumer_name = "consumer01"
# KafkaAdminClientを取得
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
# オフセットを取得
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)
返回结果可能如下所示,用TopicPartition类作为偏移量的键来表示。
{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}
将偏移值写入其他消费者
为了修改consumer,使用KafkaConsumer。
将之前获取的偏移量信息写入consumer02。
如果consumer不存在,则会自动创建。
from kafka import KafkaConsumer
consumer_group_name = 'consumer02'
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers,
enable_auto_commit=False)
# offsetの情報を書き込み(consumerが存在しない場合作成される)
consumer.commit(offsets)
用consumer02写入了与consumer01相同主题的偏移信息。
如果使用consumer02开始订阅,应该从与consumer01获取偏移相同的地方开始订阅。
购买订阅的示例 de
consumer_group_name='consumer02'
# consumerを取得
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers)
# topic01を購読する設定に変更
consumer.subscribe(topics=['topic01'])
# topic01 consumer02に実際に参加
consumer.poll()
# 受信できたメッセージをprintし続ける
for msg in consumer:
print(msg)
请参考。
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html
以下是相关网址:
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html