如何选择Kafka集群中的主题/分区数量?
在 Kafka 集群中如何选择主题/分区的数量?- 根据 Confluent 的笔记,这篇文章是在 2015/03/12 发表的,稍微有点陈旧。
增加分区会提高吞吐量
-
- 最初に理解しておくべきなのは topic の partition は Kafka における並列処理の単位であるということ
producer と broker の両側で、異なる partition への書き込みは完全に並列して行うことができる
よって、圧縮のようなコストの高い操作は、より多くのハードウェア・リソースを利用しうる
consumer の側では、Kafka はいつでも単一の partition のデータを consumer のスレッドに送る
それゆえ、(consumer group 内での) consumer における並列性の度合いは、consume される partition の数によって規定される
それゆえ、一般的には Kafka クラスタ内の partition の数が多ければ多いほど、高いスループットを実現することができる
partition の数を選ぶためのラフな公式は、スループットに基づいたものとなる
1 partition あたりにどれだけの production (以下 p) と consumption (以下 c) のスループットが実現できるかを測定してください
達成したいスループットを t としよう
そのとき少なくとも max(t/p, t/c) 個の partition が必要となる
producer 側で実現できる partition ごとのスループットは、バッチサイズ・圧縮コーデック・acknowledgement の種類、レプリケーションの因数に依存する
しかしながら、一般的にはこのベンチで示された通り、ただ 1 つの partition で数 10 MB/sec のスループットで produce することができる
consumer のスループットは、たいていアプリケーションに依存する、というのは consumer のロジックがどのくらいのスピードで各メッセージを処理できるかに対応しているからだ
よって、本当に計測が必要なのだ
徐々に partition の数を増やすことは可能だが、メッセージがキーつきで produce されている場合には注意が必要だ
キーつきのメッセージを発行するとき、Kafka はキーのハッシュにもとづきメッセージを partition にマッピングする
これによって同じキーがついたメッセージは必ず同じ partition に転送されるという保証が提供される
この保証はアプリケーションの種類によっては重要で、というのは 1 つのパーティション内のメッセージは必ず順番に consumer へ配送されるからだ
もし partition の数が変わると、そのような保証は持続されなくなってしまうだろう
この問題を回避するための、一般的なプラクティスとしては少しだけ余分に partition を用意しておくことだ
基本的に、未来 (たとえば 1, 2 年後) に目標とするスループットに基づき、partition の数を決めなさい
時がたてば、クラスタにより多くの broker を追加し、比例的に既存の partition の一部を新しい broker に移動させることができる (活性で可能)
このようにすれば、キーが利用されているときでも、アプリケーションのセマンティクスを壊さずに、スループットの増加に追いつくことができる
如果要增加分区,则需要打开更多的文件句柄。
-
- 各 partition は broker のファイルシステム上のディレクトリに対応している
そのログ・ディレクトリの中で、ログ・セグメントごとに 2 つのファイルが存在するだろう (1 つはインデクスで、もう 1 つは実際のデータ)
現在、kafka では、各 broker は各ログ・セグメントについてインデクス・ファイルとデータ・ファイル両方のファイルハンドルをオープンする
よって、partition が増えれば増えるほど、下層 OS で設定すべきオープンできるファイルハンドルの上限が大きくなる
これはたいていは単なる設定の問題だ
われわれは本番環境の Kafka クラスタが broker ごとに 3 万のファイルハンドルを開いた上体で動いているのを見たことがある
增加分区可能会降低可用性。
-
- Kafka はクラスタ内レプリをサポートしており、これにより高可用性と持続性を提供している
1 つの partition は複数のレプリカをもつことができ、それぞれのレプリカは異なる broker に保存される
レプリカのうちの 1 つが leader に任命され、残りのレプリカが follower となる
内部的に、Kafka はそのようなレプリカのすべてを自動的に管理し、レプリカ間で同期された状態であることを保証する
producer と consumer の両方からの partition に対するリクエストは、leader が処理する
1 人の broker が死ぬと、その broker 上にあった leader である partition 群は一時的に利用不可となる
Kafka は自動的にそれらの利用できない partition の leader をいくつかの他のレプリカに移動させ、クライアントからのリクエストに対する処理を継続する
このプロセスは、controller に任命された Kafka の broker のうちの 1 人によって実行される
このプロセスには、Zookeeper 内で影響のある partition それぞれについていくらかのメタデータを読み書きすることが含まれる
現在、Zookeeper に対する操作は controller の中で直列的に実行される
一般的に broker がクリーンにシャットダウンされた場合には、controller は先を見越してシャットダウンしようとしている broker から leader を 1 つずつ移動させる
1 つの leader の移動だけなら 2,3 ミリ秒で終わる
だから、クライアントから見ると、クリーンな broker のシャットダウン中に、利用不可となるのはほんの少しの期間だけだ
しかしながら、broker が kill -9 等でアンクリーンにシャットダウンされた場合、観察される利用不可な期間は、partition の数に比例する
broker が合計 2000 個の partition を持ち、それぞれの partition が 2 つのレプリカを持っているとしよう
概算で、この broker はだいたい 1000 個の partition の leader となる
この broker がアンクリーンに落ちたとき、それら 1000 個の partition のすべてがまったく同時に利用不可となる
1 つの partition につき新しい leader を選出するのに 5 ミリ秒がかかるとしよう
1000 個全部の partition に対して新しい leader を選出するには最大 5 秒がかかることになる
なので、いくつかの partition にとっては観察される利用不可な期間は、5 秒プラス障害検出にかかった時間ということになるだろう
もし不運だった場合には、落ちた broker が controller だったかもしれない
この場合には、新しい leader の選出のプロセスは、controller が新しい broker へフェイルオーバーしてからでないとはじまらない
controller のフェイルオーバーは自動的に発生するが、初期化の間に ZooKeeper から各 partition についてのいくつかのメタデータを読む必要がある
たとえば、もし Kafka クラスタ内に 10,000 個の partition があり、1 partition あたり Zookeeper からのメタデータ初期化に 2 ミリ秒かかるとしたら、利用不可の期間がさらに 20 秒増えることになるだろう
一般的に、アンクリーンに落ちるのはまれである
しかし、このようなレアケースでの可用性を気にかけるのだとしたら、おそらく broker ごとの partition の数は 2~4 千程度、クラスタ内の全 partition 数は数万程度に制限したほうがよい
如果增加分区,端到端的延迟会增加。
-
- Kafka における end-to-end のレイテンシというものは、メッセージが producer によって発行されてから consumer によって読まれるまでの時間と定義される
Kafka はメッセージがコミットされたあと、つまり同期中のレプリカすべてに複製されたあとで、そのメッセージを consumer に露出する
よってメッセージをコミットする時間が end-to-end のレイテンシのうちで大きな割合を占める
デフォルトでは、Kafka の broker は、ほかの broker からデータを複製するのに 1 つのスレッドしか使わない (2 つの broker の間でレプリカが共有されるすべての partition について 1 スレッドしか使わない)
われわれの実験では、1 人の broker から別の broker へ 1000 個の partition を複製するのに、20 ミリ秒のレイテンシが追加でかかり、これは end-to-end のレイテンシが少なくとも 20 ミリ秒であることを示唆している
これはいくつかのリアルタイム・アプリケーションにとっては高すぎるかもしれない
この問題はクラスタをより大きくすることで回避することができることに注意してほしい
たとえば、同じ Kafka クラスタの中で、1 つのブローカー上に 1000 partition leader がいて、10 人のほかの broker がいるとしよう
残りの 10 ブローカーのそれぞれは平均すると最初の broker から 100 partition を取ってくればいいだけだ
それゆえ、メッセージをコミットすることによる追加レイテンシは数 10 ミリ秒ではなく数ミリ秒となる
大体の目安で言うと、もしレイテンシを心配しているなら、broker ごとの partition の数を 100 * b * r に制限するのがよいだろう (b が Kafka クラスタ内の broker の数、r がレプリケーション因数)
增加分区可能会导致客户端需要更多的内存
-
- Confluent Platform 1.0 も搭載したもっとも最近のリリースである 0.8.2 で、われわれはより効率的な Java producer を開発した
新しい producer のよい機能の 1 つは、入ってくるメッセージをバッファする際に使われるメモリの量についてユーザーが上限を設定できるようになっていることだ
内部的には、producer は partition ごとにメッセージをバッファする
十分なデータが蓄積するか十分な時間が経過すると、蓄積されたメッセージはバッファから削除され broker へと送信される
もし partition の数を増やすなら、メッセージは producer の中のより多くの partition に蓄積されるだろう
集約された利用メモリの量は設定したメモリの上限を超えてしまうかもしれない
これが起きるとき、producer はブロックするかどんな新しいメッセージも捨てなければならず、そのどちらも理想的ではない
これが起こるのを防ぐために、 producer のメモリサイズをより大きく再設定する必要があるだろう
大まかに見積もると、よいスループットを実現するには、producer の中で produce されようとしている partition ごとに少なくとも数 10 KB を割り当て、もし partition の数が非常にたくさん増えるなら、合計のメモリ量を調整する必要がある
似たような問題が consumer にも同様に存在する
consumer は partition ごとにメッセージのバッチを取りに行く
consumer が consume する partition が増えれば増えるほど、より多くのメモリが必要となる
しかしながら、これはリアルタイムでない consumer のみに存在する問題にすぎない
总结
-
- 一般的に、Kafka クラスタ内で partition を増やすとより高いスループットが得られる
しかしながら、全体または broker ごとの partition 数を増やしすぎると、可用性やレイテンシのような項目に影響を与える可能性があることに気をつけなければならない
将来的には、これらの制約のいくつかを改善し、partition の数という点で Kafka をよりスケーラブルにしようと計画している