有关Apache Kafka消息发送和接收(使用REST-PROXY)的内容

概述

以下是有关在Apache Kafka(Confluent Platform环境)中创建简单主题并发送接收消息的方法。这篇文章是为初学者编写的。

前提条件所处环境

我之前在Docker中创建了一个简单的Confluent(Kafka)开发环境,并在Confluent开发环境中进行了说明。
所有的文件和命令都是在Docker主机上的bash中进行操作。
关于Confluent,我也在《关于Confluent Platform的吸引力》一文中进行了解释。

解释构成和术语。

    メッセージとは

这是关于在Kafka中交换的数据记录。
消息的内容没有特定限制,不仅限于文本,还可以包含图像文件等二进制数据,并且内容没有限制。
每个消息的大小默认上限为1MB,但可以进行扩展,因此可以放入大小较大的视频等内容。
通过使用JSON或AVRO等格式,可以将多个项目的信息(如地址+姓名)放入单个消息中。

※在Kafka中,提供了键(key)和值(value)这两个字段,可以自由地存储二进制数据。
为了简化问题,本次描述中没有区分键/值。

    ブローカーとは

这是关于消息传递服务器的事情。可以将接收到的消息进行临时存储,还可以提取已存储的消息。
* 在生产环境中,通常会使用三台或更多服务器组成集群来使用。而在本次环境中,我们只使用了一台。

    プロヂューサーとは

消息传递程序是指将消息发送给经纪人的程序。

    コンシューマーとは

这指的是从代理商那里收到消息的程序。

    トピックとは
スライド1.PNG

※ 在同一个程序中,可以同时具备消费者和生产者的功能,可以接收消息 => 处理 => 发送另一条消息的结果。在这种情况下,需要区分接收主题和发送主题。

    REST-PROXYについて

这是由Confluent公司开发的PROXY服务器,在使用JSON格式的REST-API时,可以进行生产者、消费者和主题配置。由于REST-API可以在几乎所有的编程语言中使用,因此它是一个无需选择特定编程语言即可使用的服务器。

如何使用REST-PROXY

利用下列三种HTTP方法来使用API:
– GET方法:获取信息
– POST方法:发送信息,并获取发送信息的结果
– DELETE方法:删除内容

一起使用方法和HTTP的Accept标题进行访问。

Accept: application/vnd.kafka.v2+json

如果不加上”Accept”可能无法正确工作。

这本书通过在Shell上使用以下两个命令来进行操作。为了使POST方法的发送内容更加清晰,采用将内容写入文件的方法。
* curl命令:用于访问REST API
* jq命令:将Json结果呈现清晰明了。

发送消息方式(制作人)

这次我们将向“demo.topic1”这个主题发送任意字符串。

我将创建用于发送REST请求的内容。

{
  "records": [
          {
                  "value": "初めてのKafka!!"
          }
  ]
}

将要发送的内容以JSON格式进行说明。在标有”records”的部分,用数组记录消息。这次只有一条消息,所以数组只有一个元素。在该元素中,将文本字符串放入”value”中。

※ value的内容可以以json格式进行描述,但这次我们只是简单地将它放在了易于理解的普通字符串中。

通过curl命令发送已创建的内容。

$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      -H "Accept: application/vnd.kafka.v2+json" \
      -d @demo.topic1.msg0.json "http://localhost:8082/topics/demo.topic1" | jq

如果以json格式发送,请使用以下内容的ContentType。如果不正确则会报错。
目标地址为REST-PROXY服务器的8082端口,使用HTTP进行发送。路径中的topics是发送消息作为生产者的固定字符串,其后的’demo.topic1’是主题名称。如果主题不存在,则会自动创建主题(默认设置)。

如果您正确发送消息,将收到以下响应。

{
  "offsets": [
    {
      "partition": 0,
      "offset": 0,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": null
}

分区:这次一定是0。关于分区功能,我们计划在另一个机会写文章,所以这次省略。
偏移量:保存在Kafka代理中的消息编号。从0开始自动递增为1、2、3…
错误码:如果有错误,返回错误码编号;如果没有错误,则返回null。
错误消息:如果有错误,则返回错误消息;如果没有错误,则返回null。
键模式ID:如果使用Avro功能,将显示ID编号。由于本次没有使用,所以为null。
值模式ID:如果使用Avro功能,将显示ID编号。由于本次没有使用,所以为null。※ 关于Avro,我们计划在另一个机会写文章。

消费者(接收端程序)

从消费者中获取发送的消息。
消费者比较复杂,首先要创建一个能够获取消息的组和实例。然后,获取与该实例对应的消息。
※ 关于组的概念将在后文中提及。
※ 实例与分区有关,将在另一个机会中撰写文章讨论。请记住,本次先创建任意实例再接收消息。

将组和实例进行初始化

我将创建一个帖子内容的文件。

{
  "name": "instance0", 
  "format": "json", 
  "auto.offset.reset": "earliest"
}

名称:以字符串形式指定实例名称。
格式:选择输出的格式。在本例中,选择了将数据保存为json格式相同的json格式。还可以选择avro二进制格式。
auto.offset.reset:指定在创建消费者组时要读取的消息位置。如果选择了earliest,则从最开始的消息开始读取。如果选择了latest,则将显示来自最新接收的消息的主题内消息。

将创建的JSON内容通过REST PROXY进行POST,并初始化实例。

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data @demo.topic1.consumer_init.json \
      http://localhost:8082/consumers/cgroup1 | jq

在使用消费者时,路径的”consumers”是一个固定的字符串。
“cgroup1″是一个消费者组的名称,可以使用任意的字符串。

当成功时,会显示如下结果。

{
  "instance_id": "instance0",
  "base_uri": "http://rest-proxy:8082/consumers/cgroup1/instances/instance0"
}

实例ID:显示实例的名称
基本URI:显示用于访问实例的URI。
(注意)主机名是rest-proxy。如果从Docker主机使用,请将其替换为localhost并使用。

主题的选择 de

我会创建一个帖子内容的文件。

{
  "topics": ["topic1"]
}

要以数组的形式列出想要接收消息的主题名称列表。

将生成的JSON内容通过REST PROXY进行POST请求,以初始化实例。

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data @demo.topic1.consumer.settopic.json \
 http://localhost:8082/consumers/cgroup1/instances/instance0/subscription -i

由于内容未能输出,现在添加了一个选项-i来显示HTTP响应头以进行确认。
路径“instances”是在执行与实例相关的操作时使用的固定字符串。
instance0是实例名称,可以任意命名。
subscription在关联主题时使用。可以进行添加(POST)、查看(GET)和删除(DELETE)操作。

确认回应

HTTP/1.1 204 No Content
Date: Wed, 16 Oct 2019 06:52:07 GMT
Server: Jetty(9.4.18.v20190429)

当命令成功执行后将返回204响应。
※ 可通过GET http://localhost:8082/consumers/cgroup1/instances/instance0/subscription查看已选择的主题列表。

(Note: The provided URL might require modification based on the actual server address.)

读取消息

通过GET方法获取消息。

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
       http://localhost:8082/consumers/cgroup1/instances/instance0/records | jq

当读取记录时,路径中的”records”是一个固定的字符串。

确认回应 。

[
  {
    "topic": "topic1",
    "key": null,
    "value": "初めてのKafka!!",
    "partition": 0,
    "offset": 0
  }
]

主题:消息主题名称
关键字:键的内容(此次未使用,因此为空)
值:消息值的内容
分区:分区号
偏移量:偏移量编号

如果在这次的流程中只有一条消息, 那只会显示一条消息, 但如果存在多条消息, 就会显示相应数量的消息。

删除实例

请使用DELETE方法删除已经使用完毕的实例。
无论是在程序结束时还是其他时机,请务必删除实例。如果不进行删除,可能会导致无法正确获取消息或无法正常保存偏移信息而反复读取相同记录等问题。
※ 在本次环境中,如果在实例上没有超过5分钟的API访问,则会自动删除。

curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
      http://localhost:8082/consumers/cgroup1/instances/instance0 -i

消费者群体的角色(消费者的重新介绍)

消费者组是管理保存在相同主题下的消息序列的组。通过分组,可以从各个组中读取相同主题上的相同消息。如果组相同,则变为互斥,并且不再读取相同的消息。

スライド2.PNG

消费者组是独立管理消息偏移量信息的,并且根据每个组来管理已提取的消息偏移量(存储在zookeeper中)。通过这种方式,可以通过分组来管理每个程序读取到的位置。

スライド3.PNG

※ 在REST-PROXY中,我们会在从recodes中提取消息后,再次对实例进行API访问以更新偏移量。如果API访问超时或失败,recodes将被视为失败,并从同一偏移量重新发送消息。

(附註)消費者群體的確認方法(命令)

展示消费者列表

# docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --list

展示当前的偏移信息

# docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --describe --group cgroup1

删除消费者组

# docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --delete --group cgroup1

请参考以下资料。

Confluent REST代理API参考文档

广告
将在 10 秒后关闭
bannerAds