请提供Apache Kafka的Producer/Broker/Consumer的工作原理和配置列表
最初版本于2018年10月5日发布。
作者为伊藤雅博,所属公司为株式会社日立制作所。
首先
在这篇帖子中,我们将介绍我们在2017年的开源大会上发表的演讲内容:“目标!成为Kafka大师~如何在Apache Kafka中实现最佳性能~”。我们将分享我们在验证过程中进行的调查内容(总共8个部分)。帖子的内容基于2017年6月发布的Kafka 0.11.0。
这次是第二次,我们将介绍Kafka的组件Broker、Producer和Consumer的处理流程和参数设置。
投稿一覧:
1. Apache Kafka的概述和架构
2. Apache Kafka生产者/代理/消费者的工作原理和配置列表(本投稿)
3. Apache Kafka的推荐架构和性能估算方法
4. Apache Kafka性能验证(1):验证环境和参数调优内容
5. Apache Kafka性能验证(2):生产者调优结果
6. Apache Kafka性能验证(3):代理调优结果
7. Apache Kafka性能验证(4):生产者再调优和消费者调优结果
8. Apache Kafka性能验证(5):系统整体延迟情况
制片人 (zhì
Producer是一个用于向主题中写入消息的客户端库。用户应用程序通过使用Producer将消息写入到由Broker集群配置的主题的每个分区中。
制作人的机制 de
生产者会定期从任一代理商中获取元数据,例如每个代理商的主机名、连接端口以及每个主题分区的领导副本位置等。生产者根据这些信息决定将哪些消息发送到哪个代理商。
以下是Producer在获取元数据之后实际发送消息时的处理流程。
请参见以下对于制片人每个步骤的详细说明。
添加要发送的记录
Kafka的消息是以键值对的形式存在,并且被称为Record。用户应用程序通过Producer的Send API添加想要发送的Record。因为Producer的Send API是线程安全的,所以推荐将一个Producer实例共享给多个用户线程(参考)。这样可以将来自多个用户应用程序的数据,包括来自多个主题的数据一起发送给Broker,从而减少处理的开销。
将数据分类并进行缓冲,存入记录批处理。
登録的记录会根据Partition单位分配到对应的Record Batch中进行缓存。由于记录是以键值对形式存在的,所以也可以将与特定键对应的记录收集到特定的Partition(的Record Batch)中。这样一来,例如具有特定用户ID的记录可以由负责该Partition的一个消费者一起处理。
另外,将请求的并发发送数(如后文所述的max.in.flight.requests.per.connection)设置为1,可以确保在Partition内保证Record的顺序。这样可以保持特定用户ID的Record的时间顺序。如果需要压缩Record,则会按Record Batch单位进行压缩。至此,整个处理过程将在调用Send API的用户应用程序的线程中执行。
3. 发送请求
记录批次会由生产者的网络线程按照经纪人单位一起发送(称为生产请求)。通过将多个主题的记录批次在一次请求中一起发送,可以减少通信开销。请求将在以下某个时机发送。
-
- リクエストの全Record Batchが上限サイズ(後述の batch.size)に達したとき
-
- リクエストが上限サイズ(後述の max.request.size)に達したとき
- ネットワークスレッドのデータ蓄積待機時間(後述の linger.ms)が経過したとき
将记录批次存储到分区中
经纪人会将收到的包含在Produce请求中的每个记录批次存储到相应的分区中。记录批次在经纪人中不会被解压,而且在经纪人之间的复制和发送到消费者也是以记录批次为单位进行的。因此,Kafka通过以记录批次为单位处理大量记录,减少处理开销并提高吞吐量。
5. 收到请求完成通知
经纪人处理完Produce请求后,会向生产者发送请求完成通知。这个通知会在以下任何一个时机进行,这取决于生产者在请求的acks参数中指定的值。通过这个方式,生产者可以选择响应等待时间和数据传输的可靠性以及数据持久性之间的权衡。
例えば最小ISR数が2の場合、Leader Replica と1個以上のFollower Replicaに複製されたことを確認した時点でレスポンスを返す
制作人的主要参数。
以下将展示Producer的主要参数。详细信息请参考官方文档。
bootstrap.servers
デフォルト値:なし
説明:Brokerの接続先リスト。いずれかのBrokerにアクセスして初期接続を確立する。
send.buffer.bytes
デフォルト値:128 KB
説明:TCP送信ソケットバッファのサイズ。-1に設定するとOSのデフォルト値が使用される。
receive.buffer.bytes
デフォルト値:32 KB
説明:TCP受信ソケットバッファのサイズ。-1に設定するとOSのデフォルト値が使用される。
batch.size
デフォルト値:16 KB
説明:1 Record Batchの最大サイズ。Record Batchがこのサイズに達するとリクエストが送信される。
max.request.size
デフォルト値:1 MB
説明:1 リクエストの最大サイズ。リクエストがこのサイズに達するとリクエストが送信される。
linger.ms
デフォルト値:0 ms
説明:ネットワークスレッドが送信前にデータ蓄積を待つ時間。Record Batchサイズやリクエストサイズが上限に達していなくても、この時間が経過したら送信される。ネットワークスレッドはユーザスレッドから独立したシングルスレッドであり、各Brokerに対して順番にリクエストを送信する。ネットワークスレッドがあるRecord Batch群をリクエストとして送信している間にも、ユーザスレッドは別のRecord BatchにRecordを追加し続ける。そのためRecord Batchがデータ蓄積を待つ時間は、 linger.ms + ネットワークスレッドが別のリクエスト群を送信処理している時間、となる。よってこのパラメータを0msに設定しても、追加したRecord(のRecord Batch)が即時送信されるわけではないことに注意。
buffer.memory
デフォルト値:32 MB
説明:ProducerがRecord Batchのバッファリングに使用できるメモリ量。この値を増やすとバッファリングできるRecord数が増え、送信待ちRecord数が増加した際にも対応できるようになる。Producerはこの他にデータ圧縮や送信処理に別途メモリを使用する。
acks
デフォルト値:1
説明:Producerからの送信リクエストにBrokerが書き込み完了通知(ack)を返すタイミング。
設定値:0 (即時)、1(Leader Replicaへの書き込み完了時)、all(Topicの最小ISR数まで複製完了時)
retries
デフォルト値:0
説明:リクエストの送信失敗時にリトライする回数。acks=1またはallの場合のみ有効。
max.in.flight.requests.per.connection
デフォルト値:5
説明:各Brokerとのコネクション内で並列送信できるリクエスト(送信済みだがackが返ってきていないリクエスト)の上限数。2以上に設定した場合、送信失敗時のリトライによってメッセージの順序が変わる可能性がある(次のリクエストが成功した後に、前のリクエストを再試行する可能性がある)。そのため、送信処理したRecordの順序を保証したい場合は1に設定すること。
compression.type
デフォルト値:none (圧縮なし)
説明:ProducerはRecord Batch単位でデータを圧縮する。Record Batchサイズが大きいほど圧縮のスループットは高くなる。
設定値:none, gzip, snappy, z4
max.block.ms
デフォルト値:60秒
説明:ProducerへのRecord登録(send()メソッド呼出)時にバッファリングするメモリが足りない場合や、Brokerからのメタデータ取得(partitionsFor()メソッド呼出)における、処理の最大ブロック時間。この時間ブロックされても処理が完了しなかった場合は例外がスローされる。
经纪人 jì
经纪人在多台机器上构建集群,并形成名为Topic的分布式消息队列。
经纪人的机制
Broker会接收来自Producer、Consumer和其他Broker的各种请求。在这里,我们将解释当收到Produce请求将消息写入Topic和收到Fetch请求从Topic读取消息时的操作。以下是Broker在接收到Produce/Fetch请求时的处理流程。
以下是经纪人处理的每个细节。
1. 收到请求
生产者会接收到从生产者发来的将消息写入主题的生产请求,而消费者会接收到从主题中读取消息的获取请求。
2. 将请求加入队列
套接字服务器将收到的请求存储在请求通道的请求队列中。请求通道是一个队列,用于在处理请求的套接字服务器和请求处理程序之间传递请求/响应。通过使用这个队列,可以防止套接字服务器和请求处理程序的线程被阻塞。
3. 将请求的数据写入/读取到每个Leader副本的Segment文件中。
请求处理器从请求队列中取出请求并进行处理。对于生产请求,首先将数据写入本地的Leader副本的段文件中。如果Acks=1,则此时将ack作为响应发送回生产者(进入5)。对于提取请求,从本地的Leader副本中读取数据。如果目标数据可读取,则立即作为响应发送给消费者(进入5)。
4. 等待请求处理完成
如果Produce请求的acks参数设置为all,那么需要等待数据被复制到其他Broker上。如果待抓取的数据在Replica中不存在,那么在等待数据积累之前,会在返回响应之前等待一段时间。为了防止Request Handler线程被阻塞,请求会暂时存储在一个名为Purgatory的地方。每个Broker的Replica Fetcher线程会定期发送Fetch请求,将其他Broker的Leader Replica的数据复制到自己持有的Follower Replica上。一旦确认存储在Leader Replica上的数据已经复制到了最小ISR数,那么存储在Purgatory中的Produce请求将被视为已完成。
5. 将响应排入队列中(将响应加入队列)
当请求处理完成后,将结果存储到请求通道的响应队列中作为响应。
6. 回复响应
Socket Server从响应队列中取出响应,向Producer/Consumer/(其他Broker的) Replica Fetcher发送回复。
经纪人的主要参数
以下是Broker的主要参数。有关所有参数的详细信息,请参考官方文档。
日志设置
在Kafka中,我们将写入到Topic的数据实体称为日志。下面列出了与日志保存和刷新相关的主要参数。
log.dirs
デフォルト値:なし
説明:Logディレクトリのリスト。Brokerの各ディスク上に作成したディレクトリを指定する。
log.retention.hours
デフォルト値:168時間(7日間)
説明:Log保存期間(時単位)。log.retention.minutes または log.retention.ms の設定が優先される。
log.retention.minutes
デフォルト値:なし
説明:Log保存期間(分単位)。log.retention.ms の設定が優先される。
log.retention.ms
デフォルト値:なし
説明:Log保存期間(ミリ秒単位)。
log.retention.bytes
デフォルト値:なし
説明:Logの最大保存サイズ。Logがこのサイズに達したら、Log保存期間に関わらず古いものから削除される。
log.segment.bytes
デフォルト値:1 GB
説明:Segmentファイルの最大サイズ
log.roll.hours
デフォルト値:168時間 (7日間)
説明:Segmentファイルのロールアウト間隔(時単位)。この時間が経過すると、現在使用しているSegmentファイルのサイズが最大値(log.segment.bytes)に達していなくても、そのファイルを閉じて新しいSegmentファイルを作成する。log.roll.ms の設定が優先される。
log.roll.ms
デフォルト値:なし
説明:Segmentファイルのロールアウト間隔(ミリ秒単位)。
log.cleaner.threads
デフォルト値:1
説明:Log圧縮に使用するスレッド数。
background.threads
デフォルト値:10
説明:ファイル削除などバックグラウンドで実行する様々なタスク用のスレッド数。
num.recovery.threads.per.data.dir
デフォルト値:1
説明:起動時のLog復旧と、シャットダウン時のフラッシュに使用する、データディレクトリあたりのスレッド数
log.flush.interval.messages
デフォルト値:Long.maxvalue
説明:Logのディスクへの強制フラッシュ間隔(Partition単位のメッセージ数)。デフォルトではLong型の最大値が設定されており、Kafkaは明示的なフラッシュを行わず、代わりにOSのバックグラウンドフラッシュ機能に任せている。
log.flush.interval.ms
デフォルト値:なし
説明:Logのディスクへの強制フラッシュ間隔(ミリ秒)。設定されていない場合、log.flush.scheduler.interval.ms の値が使用される。
log.flush.scheduler.interval.ms
デフォルト値:Long.maxvalue
説明:Logをディスクにフラッシュする必要があるかどうかを確認する間隔(ミリ秒)。デフォルトではLong型の最大値が設定されており、Kafkaは明示的なフラッシュを行わず、代わりにOSのバックグラウンドフラッシュ機能に任せている。
主题设定
以下是有关主要主题设置的参数。
num.partitions
デフォルト値:2
説明:TopicのPartition数。Topic単位でも個別に指定可能。
default.replication.factor
デフォルト値:1
説明:Replicaの複製数(Replication Factor)。PartitionのLeader ReplicaとFollower Replicaの合計数。min.insync.replicasより大きくすること。Topic単位でも個別に指定可能。
min.insync.replicas
デフォルト値:1
説明:In Sync Replica (ISR) の最小数。Replicaがこの個数まで同期していないPartition(のLeader Replica)は書き込みを受け付けなくなる。また、Recordはこの個数まで複製された時点でコミットされたものとみなされる。Topic単位でも個別に指定可能。
offsets.topic.num.partitions
デフォルト値:50
説明:Consumer offset保存用TopicのPartition数。ConsumerはRecordをどこまで読みだしたのかを示すOffsetを、このTopicに格納して永続化する(詳細は後述)
offsets.topic.replication.factor
デフォルト値:1
説明:Consumer offset保存用TopicのPartitionレプリカ数。min.insync.replicasより大きくすること。
接收和处理请求
下面是有关Socket服务器接收/返回请求、请求通道中请求排队以及请求处理处理程序的参数。
num.network.threads
デフォルト値:3
説明:Socket Serverがリクエストの受信とレスポンスの送信に使用するスレッド数。
message.max.bytes
デフォルト値:976 KB
説明:Socket Serverが受け付ける最大Record Batchサイズ。
socket.request.max.bytes
デフォルト値:100 MB
説明:Socket Serverが受け付ける最大リクエストサイズ。
socket.receive.buffer.bytes
デフォルト値:100 KB
説明:Socket Server のTCP受信ソケットバッファサイズ。-1に設定するとOSのデフォルト値が使用される。
socket.send.buffer.bytes
デフォルト値:100 KB
説明:Socket Server のTCP送信ソケットバッファサイズ。-1に設定するとOSのデフォルト値が使用される。
queued.max.requests
デフォルト値:500
説明:Request Channelのリクエストキューのサイズ。リクエストがこの数までキューに詰まれると、Socket Serverは新規リクエストの受付を停止する。
num.io.threads
デフォルト値:8
説明:Request Handlerのリクエスト処理用スレッド数。この処理はディスクI / Oを含む。
複製品
以下是有关Replica复制的参数。
num.replica.fetchers
デフォルト値:1
説明:Replica Fetcher用スレッド数。このスレッドがLeader Replica を持つBrokerにFetchリクエストを送信して、取得したRecordをローカルのFollower Replicaに書き込む。
replica.socket.receive.buffer.bytes
デフォルト値:64 KB
説明:Replica FetcherのTCP受信ソケットバッファのサイズ。-1に設定するとOSのデフォルト値が使用される。
replica.lag.time.max.ms
デフォルト値:10秒
説明:Replica複製の滞留・遅延の判定時間。下記の状態がこの判定時間以上続いたFollower Replicaは、同期していないとみなされてISRから除外される。
Leader Replicaに対してFetchリクエストを送信していない
Replicaの最新Offsetまで取得が追い付いていない
集群管理
以下是有关集群管理的参数。请注意,一旦与ZooKeeper的连接中断,经纪人将被排除在集群之外,直到重新连接。
broker.id
デフォルト値:0
説明:クラスタ内でBrokerを一意に識別するID。0から始まる数値を使用する。
zookeeper.connect
デフォルト値:なし
説明:ZooKeeperのホスト名のリスト
zookeeper.connection.timeout.ms
デフォルト値:なし
説明:Zookeeperコネクションのタイムアウト。BrokerがZooKeeperとの接続を確立する際の最大待機時間。設定されていない場合は、zookeeper.session.timeout.ms の値が使用される。
zookeeper.session.timeout.ms
デフォルト値:6秒
説明:Zookeeperセッションのタイムアウト。この期間内にBrokerからZooKeeperへのハートビートがない場合、ZooKeeperはそのBrokerが消えたと判断し、Brokerの情報をZooKeeper上から削除したうえで他のBrokerに通知する。
消费者 zhě)
Consumer是用于从主题读取消息的客户端库。用户应用程序使用Consumer从由Broker集群组成的主题的每个分区中读取消息。
消费者的机制
消费者和生产者一样,定期从任何一个代理中获取元数据,以了解每个代理的主机名、连接端口和每个主题分区的领导副本位置等信息。消费者根据这些信息决定从哪个代理获取记录。
以下是Consumer获取元数据后实际获取消息时的处理流程。
用户应用程序调用消费者的Poll API以获取记录。如果消费者内部的队列中没有要获取的记录,则会向代理发送抓取请求以获取记录。抓取请求需要指定要获取的主题、分区列表以及要获取的偏移量范围(记录编号)。但实际获取的单位是记录批次。从代理获取的记录批次将存储在消费者内部的队列中。从这个队列中取出记录批次后解压缩,并将记录返回给调用Poll的用户应用程序。
消费者在管理记录有多远被读取的偏移量方面负责,而代理服务器不做排他控制。因此,即使消费者数量增加,代理服务器的负担也能够减少。消费者在应用程序重新启动时需要将自己的偏移量持久化,以便能够从自己的偏移量继续进行。因此,消费者会将自己的偏移量保存在Kafka上的偏移量主题或任意数据存储中。
关于消费者群体的事项
用户应用程序可以通过组成一个或多个Consumer的Consumer Group来实现对一个Topic数据的分布式读取。如下图所示,Topic的每个分区只能由Consumer Group内特定的一个Consumer读取。这样就可以并行且无重复地读取Topic的消息。由于可以进行数据的分布式读取,所以Kafka与Hadoop/Spark/Storm等并行分布式处理框架兼容性良好。另外,即使Topic的数据被读取,也不会立即被删除,因此可以从多个Consumer Group读取相同Topic的数据。
消费者的主要参数
下面是Consumer的主要参数。有关所有参数的详细信息,请参阅官方文档。
bootstrap.servers
デフォルト値:なし
説明:Brokerの接続先リスト。いずれかのBrokerにアクセスして初期接続を確立する。
group.id
デフォルト値:なし
説明:Consumer Group名
fetch.min.bytes
デフォルト値:1byte
説明:Fetchリクエストで取得する最小データサイズ。
fetch.max.bytes
デフォルト値:1MB
説明:Fetch リクエストで取得する1Partitionあたりの最大データ量。
max.partition.fetch.bytes
デフォルト値:50MB
説明:Fetch リクエストで取得する最大データ量。
fetch.max.wait.ms
デフォルト値:500ms
説明:FetchリクエストがPurgatoryで待機する最大時間。Fetch リクエストで取得するデータ量がfetch.min.bytesに満たない場合、FetchリクエストはBrokerにデータが蓄積するまでPurgatoryで待機する。この値は replica.lag.time.max.ms より小さくする。
max.poll.records
デフォルト値:500
説明:1回のpoll() 呼出で取得する最大レコード数。
receive.buffer.bytes
デフォルト値:64KB
説明:TCP受信ソケットバッファのサイズ。-1に設定するとOSのデフォルト値が使用される。
enable.auto.commit
デフォルト値:True
説明:OffsetをバックグラウンドでKafkaに自動コミットするか否か。
auto.commit.interval.ms
デフォルト値:5秒
説明:OffsetをKafkaに自動コミットする頻度(ミリ秒単位)。
最后
在这篇投稿中,我们介绍了Kafka的Producer、Broker和Consumer的机制和配置列表。下一篇投稿中,我们将介绍Kafka的推荐配置和性能估计方法。
第3篇:Apache Kafka的推荐配置和性能估计方法