Apache Kafka的术语和命令
我正在学习Apache Kafka,我会总结下来。
环境
动物园管理员3.4.14(confluentinc / cp-zookeeper:5.3.2)
Kafka 2.3.1(confluentinc / cp-kafka:5.3.2)
我正在使用Docker映像。
对于 Kafka 的术语理解
在这里,我会简略地总结每个术语。
我本想制作一张图来总结,但是伊藤雅博的文章非常易懂,详细信息请参考这里。
Apache Kafka的概述与架构
操作主题
使用kafka-topics命令来操作主题。
共通选项
撰写主题
kafka-topics –create是用于创建主题的命令。
创建话题示例
如果要以test-topic作为主题名称进行保存,分区数为3,复制数为3.
$ kafka-topics --create --zookeeper zookeeper:2181 --topic test-topic --partitions 3 --replication-factor 3
Created topic test-topic.
确认主题
使用kafka-topics –describe命令来查看话题。
请提供一个根据主题内容确认的样本。
$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-topic Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
这些输出结果分别如下。
经纪人系统故障的情况
假设Kafka集群中有3个Broker,在其中一个Broker宕机的情况下,我会提供该状态的示例。
kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,1
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3
Topic: test-topic Partition: 2 Leader: 1 Replicas: 2,1,3 Isr: 1,3
最早,拥有Partition 1的Leader Broker使用了Broker 2,而现在已经切换回Broker 1。
此外,Isr状态仅列出了Broker 1和3,可以看出Broker 2并不属于Isr。
列出主题的列表
使用kafka-topics –list命令来检查主题。
确认主题内容的示例
$ kafka-topics --list --zookeeper zookeeper:2181
__confluent.support.metrics
__consumer_offsets
test-topic
test-topic1
删除主题
使用 Kafka-topics –delete 命令来删除主题。
删除话题示例
$ kafka-topics --delete --zookeeper zookeeper:2181 --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Error while executing topic command : Topic 'test-topic' does not exist as expected
[2020-02-09 13:03:11,678] ERROR java.lang.IllegalArgumentException: Topic 'test-topic' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:437)
at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:349)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
主题的设定更改
在中国本地语言中,仅需要一个选项来改写如下内容:
使用kafka-topics –alter命令来更改主题设置。
主题更改的样本
$ kafka-topics --alter --zookeeper zookeeper:2181 --topic test-topic --partitions 6
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic PartitionCount:6 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test-topic Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test-topic Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-topic Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: test-topic Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
顺便提一下,在警报选项中无法指定副本因子。
此外,并不能减少分区的数量。
制片人的操作
可以使用kafka-console-producer作为Producer进行操作。
发送讯息
kafka-console-producer --topic=test-topic --broker-list=kafka1:29092,kafka2:29093,kafka3:29094
>test1
>test2
>
当你执行kafka-console-producer,会显示输入提示符>,你可以输入要发送的字符并执行,即可发送。
在broker列表中,以逗号分隔指定存在于Kafka集群中的broker。本例中,有三台broker,因此指定了三个。
消费者操作
在Kafka控制台消费者中可以作为消费者进行操作。
收到消息
$ kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic
test1
test2
各种选择
消费者组操作
您可以使用kafka-consumer-groups来操作ConsumerGroup。
展示更为详细的信息
$ kafka-consumer-groups --describe --bootstrap-server kafka1:29092 --group consumer-group
Consumer group 'consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group test-topic 4 12 12 0 - - -
consumer-group test-topic 5 10 10 0 - - -
consumer-group test-topic 2 7 7 0 - - -
consumer-group test-topic 3 9 9 0 - - -
consumer-group test-topic 1 10 10 0 - - -
consumer-group test-topic 0 20 20 0 - - -
重置ConsumerGroup的offset
利用reset-offsets重置。
kafka-consumer-groups --bootstrap-server kafka3:29094 --group consumer-group --reset-offsets --to-earliest --all-topics --execute
GROUP TOPIC PARTITION NEW-OFFSET
consumer-group test-topic 4 0
consumer-group test-topic 5 0
consumer-group test-topic 2 0
consumer-group test-topic 3 0
consumer-group test-topic 1 0
consumer-group test-topic 0 0
请参考
请查阅参考文献。
Apache Kafka文档
Apache Kafka的概述和架构
使用docker-compose
这次使用的是docker-compose.yml文件。
version: "3"
services:
zookeeper:
container_name: kafkajs-typescript-test-zookeeper
hostname: zookeeper
image: confluentinc/cp-zookeeper:5.3.2
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka1:
container_name: kafkajs-typescript-test-kafka1
hostname: kafka1
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka2:
container_name: kafkajs-typescript-test-kafka2
hostname: kafka2
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29093:29093"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka3:
container_name: kafkajs-typescript-test-kafka3
hostname: kafka3
image: confluentinc/cp-kafka:5.3.2
depends_on:
- zookeeper
ports:
- "29094:29094"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
参考信息,在kafka集群少于3个的情况下,将会输出类似以下的错误提示。
[2020-03-21 07:43:52,872] ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
在这种情况下,需要设置KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR为1(数字取决于集群数)。