思考串流处理
根据《Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems》进行了整理,以便思考关于重新进行流处理的问题。
活动流
-
- stream processingではrecordのことをeventと言ったりします
eventはタイムスタンプを含む場合が多い
eventの作成者はproducer, publisher, senderなどと呼ばれる
eventの使用者はconsumers, subscribers, recipentsなどと呼ばれる
あるeventは1人の作成者によって作られ、複数の使用者によって使用される
良くないevent streamの例
producerは一般的なデータベースにeventを挿入
consumersは定期的にポーリングして挿入されたデータを取得する
一般的なデータベースは差分データだけ取得するのに有効な仕組みではない。
アプリケーションは前回どこまでデータを取得したのか覚えてないといけない。
インデックスがない、効かない場合はポーリングのたびにフルスキャンが走る
即时通讯系统
-
- eventが作られたらconsumersに通知がされる仕組みが良い。このような仕組みをmessaging systemと呼ぶ。
-
- eventを含むデータをmessageと呼んだりします
-
- Unix pipeやTCPコネクションは通知の仕組みを有するが、これらは1対1の通信であり、1対多の通信は不可能
-
- messaging systemでは以下のことを気をつけないといけない
producerの方がcomsumersより早いスピードでeventを生成するときはmessageを破棄するか、バッファするか、バックプレッシャーするかしないといけない
何かがクラッシュした時にメッセージロストを許容できるのか、それともロストは許されないのかでアーキテクチャは変わる
生产者和消费者直接进行消息传递的方式
-
- 中間ノードは用意せずproducerが直接それぞれのconsumersと通信をする方式
-
- UDPのマルチキャストが良く使われている
UDPだけだとunreliableなのでアプリケーションプロトコルでパケットロスを救う仕組みが必要
ZeroMQはこの方式を採用している
この方式は以下の点でdisadvantageがある
パケットロスを救う仕組みを作りこむのが大変
consumerが死んだ場合にproducerにリトライの仕組みを入れないといけない
producerが死んだ場合はconsumersへ未送信のmessageをロストさせない仕組みを作りこむのが大変
使用消息队列的方法
-
- メッセージキュー(メッセージブローカーとも呼ぶ)はmessage streamを扱うのに最適化されたデータストア
-
- producersはメッセージキューにmessageを書き込み、consumersはメッセージキューからmessageを読む
-
- この方式にするとメッセージキューをdurableにしておけば耐障害性を担保できる
- この方式ではproducersとconsumersは非同期で処理が行われることになる
消息队列与数据库的区别
-
- いくつかのメッセージキューは2phase commitを使ったりなどデータベースと同じ技術を使っていたりもするが違いは色々ある
メッセージキューはconsumersにmessageが伝達されたらそのmessageは削除される。データベースはexplicitに削除しない限りデータは保存される
データベースはセカンダリインデックスの付与や多様な検索が可能だがメッセージキューはパターンマッチングでmessageをfilterできるくらい
データベースの検索はpoint-in-timeのsnapshotを見せる。メッセージキューは追加されたmessageを見せる
多个消费者
-
- 複数のconsumerが同じtopicを読むときに代表的なパターン
Load balancing
複数のconsumerにmessageを振り分ける方式。messageの量が多くてconsumerの処理が追いつかないときに負荷分散目的で使う
Fan-out
同じmessageをそれぞれのconsumerに割り当てる方式。同じmessageを使うが処理は別々の時に使ったりする
当消费者崩溃时
-
- メッセージロストを防ぐためにconsumersは明示的にmessageを受け取ったことをメッセージキューに伝える。メッセージキューに伝えることをacknowledgmentsという
-
- メッセージキューは全てのconsumersからacknowledgmentsを受け取っていないときはmessageを削除しない
-
- Load balancing方式であれば別のconsumerに再送する。ただし再送が行われる場合にはproducerがmessageを送った順序とずれる場合がある。(再送の間に後続のmessageがすでに送られている可能性もあるため)
順序がずれるとまずい場合はLoad balancing方式は使ってはダメ
消息队列的实现
AMQP/JMS-style
メッセージキューはconsumerからacknowlegmentsを受け取った後、messageを削除する方式
古いメッセージを読む必要のない場合などに利用される
Log-based
append-onlyでdiskにmessageを書いていく方式
consumerはlogの最後の位置まできたらnotificationが来るまで待つ
より高いスループットを実現するためにはlogをパーティショニングする
異なるパーティションは異なるノードに置くことも可能
それぞれのパーティションでは順序保証のためにそれぞれのmessageにシーケンスナンバーを割り振る
異なるパーティション間ではシーケンスナンバーは共有しないので、パーティション間の順序保証はない
Apache KafkaやAmazon Kinesis Streamsはこのような実装をしている
ロードバランシングするときは各consumerに担当するパーティションを割り当てるのが通常だがパーティション数以上にconsumerを増やせないデメリットはある
メッセージキューではデータベースレプリケーションと同じようにmessageにlog sequence numberを割り振って、consumerとの接続が切れ、再接続する時にデータロストなく読める仕組みがある
磁盘管理
-
- logはある一定の単位(セグメントと呼ぶ)で分割されディスクに保存される。古いセグメントは定期的に削除またはアーカイブストレージへ移動させる
-
- 非常に遅いconsumerがいる場合、セグメントを基本的に削除しないがディスクが一杯になりそうであれば削除を行う。これはcircular buffer, ring bufferと呼ばれる技術で
- 実装を行う
慢的消费者策略
- consumerがどれだけ遅れているのかを監視しておき、遅れが顕著な場合はアラートをあげる。それを受けてオペレータが手動でconsumerの遅れ対策を行う。
数据库中的数据流
-
- 現在は様々な種類のデータストレージが使われており(データベース、DWH、メッセージキューなど)それらを連携してシステムを作ることが多い。その時に必要になるのがデータ連携。一般的にデータ連携ではETLが使われる
-
- ETLでは時としてデータベースからデータを取得する時にデータのフルコピーを取ることがある。しかしこれは更新データ以外も含むため明らかに非効率。
- 回避する手段としてdual writesという方法が使われることがある。これはアプリケーションが2つのシステムに書き込む方法。ただ、この方法はrace conditionが発生する可能性があることに注意しないといけない。また片側のシステムに書き込みは成功したが、もう一つには失敗した場合の制御などをアプリケーション側で行うのは非常に厳しい。
变更数据捕获(CDC)
-
- これらの問題を解決する方法としてChange Data Captureが出てきた。これはデータベースのトランザクションログをデータ連携する技術。
-
- トランザクションログは今まで仕様が公開されていなかったが、APIを使ってトランザクションログを取得できるようにしたのがChange Data Capture。
-
- 更新データを取得する方法としてはトリガーを定義してデータベースへの全ての更新操作を特定のテーブル(changelogテーブルと言ったりする)に反映するやり方もあるが、これはクエリを2倍実行することになるのでパフォーマンスオーバーヘッドが大きい
-
- 初期の構築としてはデータベースのスナップショットをとり、コピー先のシステムに反映させる。スナップショットはログのオフセットが記録されているので、CDCでデータを反映させる時にどのログから反映させれば良いかわかる。
- あるプライマリキーを持つレコードが複数回更新された時に最新のログだけを反映すればデータの整合性は保たれるため、最新のログだけを持つようにcompactionが行われる
事件溯源
-
- Event Sourcingはeventという形で更新操作を追加していくシステムデザインアーキテクチャ
- アプリケーション側で行うので、更新操作を追加していくという点ではCDCと同じだが実現するレイヤが異なる
处理数据流
-
- ストリーム処理では何ができるか。バッチとの違いはストリーム処理は終わりがないこと。そのため、ソートなど終わりがないデータに対してはできない
-
- CEP(Complex Event Processing)では複雑なパターンマッチングをすることができる
-
- 特定のタイムスパン(windowと呼ぶ)で集計を行う
時刻はノードのローカル時刻を使うことが多い
しかし障害などでイベントの発生時刻とそのイベントが処理されるノードのローカル時刻が大きく異なる場面が出ることがある
イベントの発生時刻でwindowすることもあるが、それだと全てのイベントがいつ到着するかは保証することができない
そのため特定の時間だけ待った後にwindowを完了させる。完了した後にイベントがきた場合は以下の2つのどちらかを選択する
イベントを捨てる
更新されたデータにイベントを付加する
windowのタイプ
Tumbling Window
固定の時間のwindowで特定のイベントは必ず1つのwindowに入る。例えば5分間隔で09:00 – 09:04, 09:05 – 09:09, 09:10 – 09:14のようにwindow処理が行われる
Hopping Window
固定の時間のwindowだがTumbling Windowとは違いwindowがオーバーラップする。例えば5分のwindowで1分間隔の場合、09:00 – 09:04, 09:01 – 09:05, 09:02 – 09:06のようにwindow処理が行われる
Sliding Window
イベントの発生を契機にイベントの発生時刻からX分前の間をwindowとして処理する。Apache beamでいうSliding WindowはここでのHopping Windowのことです
Session Window
ユーザセッションごとにwindowを定義する。そのため、windowの時間間隔は事前には決まっておらず、ユーザセッションの長さに依存する
流加入
-
- stream-stream joins(window join)
window同士をjoinする方式
例えばCTRを見たい時に使う。検索のログと検索結果から何をクリックしたかのログをjoinする。
stream-table join
イベントと固定のテーブルをjoinする方式
例えばイベントに含まれているユーザIDを使ってユーザのマスタ表からユーザ名などを取得する
ストリームエンジンからデータベースに接続するやり方もあるが、これだとネットワーク通信によるオーバーヘッド、データベースの負荷上昇が起こる可能性がある。そのため、場合によってはこのテーブルをストリームエンジン内のメモリにハッシュ表として持たせるなどした方が良い
table-table join
固定のテーブル同士をjoinする方式
何かのイベントが発生した時にキャッシュの更新などメンテナンス目的で使用したりする
time-dependence join
例えばtax rateテーブル(tax rateが変わるたびに更新される)とイベントをjoinする時にどの時点のtax rateテーブルなのかで結果が変わる。このような時刻に依存するjoinがある
joinをする時刻によって結果が変わるため非決定的であり好ましくない。解決する手順としてtax rateテーブルをhistoricalにし、各レコードにidentifierを割り当てる。このようにすればどの時点のデータがjoinされたかがわかる。
容错性
-
- 処理が失敗してもexactly onceを保証するにはどうしたら良いか。バッチ処理に関しては失敗した場合はリトライすれば良いだけなので簡単。ストリーム処理に関してはいくつかの方法がある。
microbatching
バッチ処理を細かいタイミング(1秒間隔など)で実行しストリーム処理に見せる方式。この方式であればバッチ処理と同じようにリトライの概念を適用できる。
microbatchingは必然的にtumbling windowになる
spark streamingはmicrobatchingである
checkpointing
あるタイミングで状態を保存するチェックポイントをストレージに保存する方式。バリア命令をメッセージストリームに流し、そのバリア命令をトリガにしてチェックポイントを行う。
Apache Flinkはcheckpointingを行う
atomic commit
downstreamに状態を渡す時にatomic commit(two-phase commitとか)を行い状態を確実に確定させる方式
Google Cloud Dataflow, VoltDBはatomic commitを行う
冪等性
インクリメント処理は冪等ではない。冪等にするためにApache Kafkaではmessageのオフセットをもつ。ストリーム処理の中でオフセットを確認しすでに処理したmessageなのかどうかを判断して冪等性を維持する。
復旧
Apache FlinkではスナップショットをHDFSなどの共有ストレージに保存する。node failureが起きた時に別のnodeがスナップショットを読み込み処理を再開する。
Samza, Kafka streamsは状態をKafkaのtopicに保存する
VoltDBは状態を別のノードにレプリケートする