Apache Kafka的术语和命令

我正在学习Apache Kafka,我会总结下来。

环境

动物园管理员3.4.14(confluentinc / cp-zookeeper:5.3.2)
Kafka 2.3.1(confluentinc / cp-kafka:5.3.2)

我正在使用Docker映像。

对于 Kafka 的术语理解

在这里,我会简略地总结每个术语。

我本想制作一张图来总结,但是伊藤雅博的文章非常易懂,详细信息请参考这里。
Apache Kafka的概述与架构

名前役割Kafka ClusterKafkaが実行されているサーバ(Broker)をグループ化したものBrokerKafkaの単一サーバZookeeperKafkaを管理するサーバProducerKafkaへメッセージを送信するアプリケーションConsumerKafkaからメッセージを取得するアプリケーションTopicメッセージを整理するためのカテゴリーPartitionTopic内のメッセージはパーティションという単位で分散させていますReplica各Partitionは複数のBrokerに複製(Replica)されていますLeader複製されているReplicaのうち唯一読み書きが許可されているReplicaConsumer Group複数のConsumerを同一グループとして扱うためのもの。グループ化することで分散したConsumer間で同一メッセージを重複せずに読み込むことが可能ですOffsetPartition単位でメッセージをどこまで読んだか管理するためのもの

操作主题

使用kafka-topics命令来操作主题。

共通选项

オプション名役割zookeeperZookeeperを指定しますtopic操作対象のトピックを指定しますpartitionsメッセージを分割するPartition数を指定しますreplication-factorPartitionをいくつのBrokerに複製するか指定します

撰写主题

kafka-topics –create是用于创建主题的命令。

创建话题示例

如果要以test-topic作为主题名称进行保存,分区数为3,复制数为3.

$ kafka-topics --create --zookeeper zookeeper:2181 --topic test-topic --partitions 3 --replication-factor 3
Created topic test-topic.

确认主题

使用kafka-topics –describe命令来查看话题。

请提供一个根据主题内容确认的样本。

$  kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: test-topic   Partition: 0    Leader: 3   Replicas: 3,2,1 Isr: 3,2,1
    Topic: test-topic   Partition: 1    Leader: 1   Replicas: 1,3,2 Isr: 1,3,2
    Topic: test-topic   Partition: 2    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3

这些输出结果分别如下。

オプション名役割PartitionCount対象のトピックがいくつのPartitionに分割されるかが記載されますReplicationFactor対象のトピックがいくつReplicaされるかが記載されています。ここにはLeaderも含まれますPartitionPartition番号が記載されますLeaderどこのBrokerに保存されているかが記載されますReplicasどこのBrokerに保存されているかが記載されます。今回はReplicationFactorが3つ指定されているため、3つのBrokerが記載されています。IsrIn Sync Replica(Isr)の略で、対象Partitionが全て同期されている状態のBrokerが記載されます

经纪人系统故障的情况

假设Kafka集群中有3个Broker,在其中一个Broker宕机的情况下,我会提供该状态的示例。

kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic    PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: test-topic   Partition: 0    Leader: 3   Replicas: 3,2,1 Isr: 3,1
    Topic: test-topic   Partition: 1    Leader: 1   Replicas: 1,3,2 Isr: 1,3
    Topic: test-topic   Partition: 2    Leader: 1   Replicas: 2,1,3 Isr: 1,3

最早,拥有Partition 1的Leader Broker使用了Broker 2,而现在已经切换回Broker 1。
此外,Isr状态仅列出了Broker 1和3,可以看出Broker 2并不属于Isr。

列出主题的列表

使用kafka-topics –list命令来检查主题。

确认主题内容的示例

$ kafka-topics --list --zookeeper zookeeper:2181
__confluent.support.metrics
__consumer_offsets
test-topic
test-topic1

删除主题

使用 Kafka-topics –delete 命令来删除主题。

删除话题示例

$ kafka-topics --delete --zookeeper zookeeper:2181 --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Error while executing topic command : Topic 'test-topic' does not exist as expected
[2020-02-09 13:03:11,678] ERROR java.lang.IllegalArgumentException: Topic 'test-topic' does not exist as expected
    at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:437)
    at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:349)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

主题的设定更改

在中国本地语言中,仅需要一个选项来改写如下内容:
使用kafka-topics –alter命令来更改主题设置。

主题更改的样本

$ kafka-topics --alter --zookeeper zookeeper:2181 --topic test-topic --partitions 6 
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!


$ kafka-topics --describe --zookeeper zookeeper:2181 --topic test-topic
Topic:test-topic    PartitionCount:6    ReplicationFactor:3 Configs:
    Topic: test-topic   Partition: 0    Leader: 1   Replicas: 1,3,2 Isr: 1,3,2
    Topic: test-topic   Partition: 1    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3
    Topic: test-topic   Partition: 2    Leader: 3   Replicas: 3,2,1 Isr: 3,2,1
    Topic: test-topic   Partition: 3    Leader: 1   Replicas: 1,3,2 Isr: 1,3,2
    Topic: test-topic   Partition: 4    Leader: 2   Replicas: 2,1,3 Isr: 2,1,3
    Topic: test-topic   Partition: 5    Leader: 3   Replicas: 3,2,1 Isr: 3,2,1

顺便提一下,在警报选项中无法指定副本因子。
此外,并不能减少分区的数量。

制片人的操作

可以使用kafka-console-producer作为Producer进行操作。

发送讯息

kafka-console-producer --topic=test-topic --broker-list=kafka1:29092,kafka2:29093,kafka3:29094
>test1
>test2
>

当你执行kafka-console-producer,会显示输入提示符>,你可以输入要发送的字符并执行,即可发送。

在broker列表中,以逗号分隔指定存在于Kafka集群中的broker。本例中,有三台broker,因此指定了三个。

消费者操作

在Kafka控制台消费者中可以作为消费者进行操作。

收到消息

$ kafka-console-consumer --bootstrap-server=kafka1:29092 --topic=test-topic
test1
test2

各种选择

オプション名役割使い方サンプルfrom-beginningoffsetの最初から読み込みますkafka-console-consumer –bootstrap-server=kafka1:29092 –topic=test-topic –from-beginninggroupConsumer-groupを指定しますkafka-console-consumer –bootstrap-server=kafka1:29092 –topic=test-topic –group consumer-group

消费者组操作

您可以使用kafka-consumer-groups来操作ConsumerGroup。

展示更为详细的信息

$ kafka-consumer-groups --describe --bootstrap-server kafka1:29092 --group consumer-group
Consumer group 'consumer-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-group  test-topic      4          12              12              0               -               -               -
consumer-group  test-topic      5          10              10              0               -               -               -
consumer-group  test-topic      2          7               7               0               -               -               -
consumer-group  test-topic      3          9               9               0               -               -               -
consumer-group  test-topic      1          10              10              0               -               -               -
consumer-group  test-topic      0          20              20              0               -               -               -

重置ConsumerGroup的offset

利用reset-offsets重置。

kafka-consumer-groups --bootstrap-server kafka3:29094 --group consumer-group --reset-offsets --to-earliest --all-topics --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
consumer-group                 test-topic                     4          0              
consumer-group                 test-topic                     5          0              
consumer-group                 test-topic                     2          0              
consumer-group                 test-topic                     3          0              
consumer-group                 test-topic                     1          0              
consumer-group                 test-topic                     0          0  

请参考

请查阅参考文献。

Apache Kafka文档
Apache Kafka的概述和架构

使用docker-compose

这次使用的是docker-compose.yml文件。

version: "3"

services:
  zookeeper:
    container_name: kafkajs-typescript-test-zookeeper
    hostname: zookeeper
    image: confluentinc/cp-zookeeper:5.3.2
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka1:
    container_name: kafkajs-typescript-test-kafka1
    hostname: kafka1
    image: confluentinc/cp-kafka:5.3.2
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka2:
    container_name: kafkajs-typescript-test-kafka2
    hostname: kafka2
    image: confluentinc/cp-kafka:5.3.2
    depends_on:
      - zookeeper
    ports:
      - "29093:29093"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka3:
    container_name: kafkajs-typescript-test-kafka3
    hostname: kafka3
    image: confluentinc/cp-kafka:5.3.2
    depends_on:
      - zookeeper
    ports:
      - "29094:29094"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29094,PLAINTEXT_HOST://localhost:9094
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

参考信息,在kafka集群少于3个的情况下,将会输出类似以下的错误提示。

[2020-03-21 07:43:52,872] ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

在这种情况下,需要设置KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR为1(数字取决于集群数)。

广告
将在 10 秒后关闭
bannerAds