将数据流水线引入业务系统以确保一致性
总结
我将介绍将数据管道构建技术堆栈应用于业务系统构建的概念验证。
我不是数据基础技术专家,所以可能会有错误的描述。如果您注意到了,请指出,我将不胜感激。
数据管道可能给人留下了仅用于数据分析等用途的印象,但我认为它在业务系统中也可以有效解决提高多数据存储(如RDB和搜索引擎)的数据一致性以及提高故障韧性的问题。在解释这个问题之前,首先我将把本文所涉及的产品归类为代表数据基础设施技术概念ETL – Extract, Transform, Load的要素并进行介绍。
提取 – Debezium
Debezium 是由 Red Hat 大量贡献的变更数据捕获(Change Data Capture,以下简称CDC)产品。
CDC 是一种技术,用于提取关系型数据库(RDB)的变更。可以将ETL中的E(抽取)限定为仅提取变更的差异概念。
由于抽取目标被限定为差异,所以相比于完整数据的转储,自然而然地更轻量,有助于提高整个数据管道的效率。1, 2
特别是,Debezium 是通过监控用于RDB复制的二进制日志实现的低负载、低延迟的CDC,将RDB的更新转换为Apache Kafka的事件流。
转换 – ksqlDB
自然而然地,由于Extract是增量和实时的,Transform的下一步应该是流处理而不是全数据批处理。
ksqlDB是一个可以用类似SQL语法描述Kafka流处理的产品。它的特点是可以像声明性地实现ETL中的T,即Transform定义RDB视图。
加载 – Kafka Connect(接收器连接器)
最后一块拼图是ETL中的L,Load。
Kafka Connect是连接Kafka和其他数据源以实现实时传输的框架。4
从Kafka发送到其他地方的Connect称为Sink Connector,并且对于Elasticsearch、Amazon S3、Google BigQuery等各种数据源,已经发布了可以正常使用的实现。5
通过将这些产品组合在一起,可以使用非常少的代码量构建流处理数据管道。
一个关于游戏评论网站及其面临的问题。
我们以游戏评论网站作为导入情境。用户可以进行游戏评论的发表,并能够通过标题、评论内容、平均评分等进行搜索。
用户可以通过以下ER图中显示的表格设计的关系数据库(RDB)来提交评论。
RDB 在全文搜索像评论之类的文本或在数字项中搜索任意列的条件方面并不适合。
我们将使用以下带有索引的搜索引擎来实现搜索功能。
那么,在构建这个网站时有哪些挑战呢?
最初的挑战 – 一致性和耐障性 – 一致性,持久度
我們考慮以下的代碼:先將評論保存在RDB中,然後再保存在搜索引擎中。
def create
# 1) レビューを RDB に保存
review = Review.create(params)
# 2) レビューを検索エンジンに保存
review.update_search_engine
end
(1) 和 (2) 之间发生进程崩溃时,RDB和搜索引擎的数据将出现不一致。
(1) hé (2) zhī fā kuì shí, RDB hé suǒ de shù jù chū bù yí zhì.
实际上,由于同时同步处理RDB和搜索引擎往往会导致延迟增大,所以更新搜索引擎的例子通常会被实现为类似于延迟作业(Delayed Job)的异步处理。不过,要在本质上维持RDB和消息发布的一致性仍然是一个难题,目前还没有解决。(即使有人从RDB的CLI直接更新数据,一致性也会丧失,对吧……)
从RDB的数据中导出搜索引擎的数据解决方案
不采用同时保持关系数据库(RDB)和搜索引擎的一致性的分布式事务,而是定期检测并反映关系数据库和搜索引擎之间的差异,这种方式如何?
由于将关系数据库(RDB)数据作为”真实”(某一时刻)来更新搜索引擎,可以保证结果一致性收敛到相同的结果。(更新顺序可能会发生变化。)
在数据导向应用程序设计中,有一种将系统和数据分类为“记录系统(System of record)”和“派生数据”的思想。当将其应用到这个主题时,评论发布功能被归类为“记录系统”,而评论搜索功能被归类为“派生数据系统”。
新的挑戰 – 延遲 – 延迟
然而,由于关系数据库(RDB)更新到搜索引擎反映的延迟增大,出现了新的问题。
在本次主题中,当有大量(例如1,000个)评论被提交到一个游戏中时,差别检测可能需要花费很长时间。
能否将解决新问题的方案 – 将RDB更新确定为事件化?
如果能够实时检测RDB的更新,那么差异检测的延迟问题很可能会得到解决。是否可以在直接更新RDB时,捕捉到类似于Rails中ActiveRecord的after_save回调事件呢?
这正是引入CDC起点数据管道技术的动机。
概念验证
-
- 评论发布功能(记录系统)会更新MySQL的reviews表
-
- 使用Debezium将MySQL的更新事件转换为Kafka消息
-
- 通过ksqlDB处理Kafka消息并将其转换为搜索引擎索引
-
- 使用Kafka Connect将搜索引擎索引同步到Elasticsearch
- 评论搜索功能使用Elasticsearch进行搜索(派生数据系统)
实施概述
我們使用 Docker Compose 進行了實現。
程式碼可以在 weakboson/etl-applyed-app 中找到。只需進行 git 克隆和一些操作即可重現 PoC。
从终端1启动容器。
$ docker-compose up
当所有容器成功启动后,将会如下所示。
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp
elasticsearch /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp
kibana /usr/local/bin/dumb-init - ... Up 0.0.0.0:5601->5601/tcp
ksqldb-cli /bin/sh Up
ksqldb-server /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp
mysql docker-entrypoint.sh mysqld Up 0.0.0.0:3306->3306/tcp, 33060/tcp
rails /bin/bash Up 0.0.0.0:3000->3000/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
从终端2通过Rails执行MySQL的初始数据构建。
$ docker exec -it -u ${UID}:${GID} rails bash
# コンテナ内で
bundle exec rake db:create
bundle exec rake db:migrate
bundle exec rake db:reset
在PoC的说明图中,我们将ETL的每个过程进行分类和整理,但实际上,我们将Debezium和Kafka Connect以嵌入式模式安装在位于中心位置的ksqlDB容器上的配置中。
在开发阶段,ksqlDB可以通过ksqlDB的接口轻松管理Kafka Connect,这也是ksqlDB的一个优点之一。
接下来,将通过 ksqlDB 的命令行界面进行配置。
1. 提取一部分
从现在开始,几乎所有的操作都将通过 ksqlDB 的命令行界面(CLI)来完成。
在终端3上启动 ksqlDB 的命令行界面(CLI)。
$ docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
为了应对任何潜在的失败,我会将设定设为“重新开始就从最开始做起”。
ksql> SET 'auto.offset.reset' = 'earliest';
下面是一种可能的中文翻译:
接下来,我们将配置Debezium,将MySQL的更新事件传输到Kafka的消息中。
ksql> CREATE SOURCE CONNECTOR `my-service-reader` WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'root',
'database.password' = 'my-root-pw',
'database.allowPublicKeyRetrieval' = 'true',
'database.server.id' = '1',
'database.server.name' = 'my-service-db',
'database.whitelist' = 'my-service',
'database.history.kafka.bootstrap.servers' = 'broker:9092',
'database.history.kafka.topic' = 'my-service',
'include.schema.changes' = 'false'
);
当MySQL表成功设置后,相应的Kafka主题将被生成,并且可以将记录作为初始状态的消息进行确认。
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------------------
default_ksql_processing_log | 1 | 1
ksql-connect-configs | 1 | 1
ksql-connect-offsets | 25 | 1
ksql-connect-statuses | 5 | 1
my-service | 1 | 1
my-service-db.my-service.ar_internal_metadata | 1 | 1
my-service-db.my-service.games | 1 | 1
my-service-db.my-service.reviews | 1 | 1
my-service-db.my-service.schema_migrations | 1 | 1
---------------------------------------------------------------------------------
ksqlDB CLI 可以直接查看Kafka主题的内容。
ksql> PRINT 'my-service-db.my-service.reviews' FROM BEGINNING;
1.5. 在介绍ksqlDB之前进行转换概述。
ksqlDB 的基本对象是流(STREAM)和表(TABLE)。
流是类似访问日志的不可变数据,而表则类似于可变数据的关系数据库表。这两者都可以从Kafka主题中创建。以下是定义流的语法。
ksql> CREATE STREAM s WITH (
kafka_topic = 'some-topic',
value_format = 'avro'
);
此外,作为Transform过程的核心,还可以从流中抽取或汇总部分内容,定义新的流或表格。
ksql> CREATE STREAM s2 AS
SELECT s1.id,
sum(s1.price)
FROM s1
GROUP BY s1.id;
将其通过 SQL 声明式地定义为 ksqlDB 的 Transform 实现。
在题材为游戏的网站上,希望能够根据每个游戏的评价生成可变的搜索索引,最终生成的结果将成为一个表格。
(这样的表格称为“Materialized View”。这意味着聚合查询的结果被缓存起来,即具有实体。)
顺便说一下,我曾经陷入困境并花费了一些时间才理解,根据本次的要求,因为正本的数据存储是 RDB,所以应该从 Kafka 主题创建类似于表的东西。然而,通过 CDC 转发到 Kafka 的是“RDB 更新事件流”,所以首先应创建流。
你可能会想“流中无法更新记录,这可能会带来麻烦……”,但通过以“最新值(LATEST)聚合 RDB 键的 CDC 事件流”为基础来定义表,可以解决这个问题。(我明白了。)
2. 转变一些东西
所以,我們將從定義評論和遊戲的流和表開始。由於有相似名稱的流和表出現,我們遵循從 s_ 開始命名流,從 t_ 開始命名表的約定。最終我們期望的物化視圖將以 v_ 開頭命名。
-
- 定义t_games和t_reviews
定义一个通过game_id对t_reviews进行汇总的t_review_summaries表。该表还没有游戏标题。
通过JOIN t_review_summaries和t_games表,定义可以搜索游戏标题的v_game_review_index。
t_games和t_reviews的定义。
首先,我们将Kafka的消息直接定义为流。
ksql> CREATE STREAM s_games WITH (
kafka_topic = 'my-service-db.my-service.games',
value_format = 'avro'
);
接下来,我们将以最新的 s_games 进行计算,来定义 t_games。
CDC 事件的结构是在名为 before 的父层次下有 RDB 列的结构,在更新之前称为 before,在更新之后称为 after,可以使用箭头运算符指定为 after->title。
ksql> CREATE TABLE t_games AS
SELECT after->id as game_id,
latest_by_offset(after->title) AS title
FROM s_games
GROUP BY after->id
EMIT CHANGES;
让我们先确认一下目前的实现是否正常运行。
对于ksqlDB的表,您可以执行两种查询:一种是“持续从服务器返回更新的行的推送查询”,另一种是“通过键指定一次性结束的拉取查询”。
首先是指定关键字的查询。
ksql> SELECT * FROM t_games WHERE game_id = 1;
+-----------------------------+-----------------------------------------+
|GAME_ID |TITLE |
+-----------------------------+-----------------------------------------+
|1 |DARK SOULS |
Query terminated
1件退回并结束。
当次为推送查询,待 t_games 的全部数据返回后,进入等待状态。
ksql> SELECT * FROM t_games EMIT CHANGES;
+-----------------------------+----------------------------------------------+
|GAME_ID |TITLE |
+-----------------------------+----------------------------------------------+
|1 |DARK SOULS |
|2 |Bloodborne |
|3 |SEKIRO: SHADOWS DIE TWICE | 隻狼 |
Press CTRL-C to interrupt
让我们打开一个新的终端,并在 Rails 容器中启动 Rails 控制台,尝试创建或更新任意喜欢的游戏评价。
在此时点上,即使不进行转换,也可以确认到以事件驱动的方式流动的 RDB 更新非常有趣。
要结束推送查询,请按下Ctrl + C。
与 t_games 相同,我们也定义了 t_reviews。
ksql> CREATE STREAM s_reviews WITH (
kafka_topic = 'my-service-db.my-service.reviews',
value_format = 'avro'
);
ksql> CREATE TABLE t_reviews AS
SELECT after->id AS review_id,
latest_by_offset(after->game_id) AS game_id,
latest_by_offset(after->score) AS score,
latest_by_offset(after->comment) AS comment
FROM s_reviews
GROUP BY after->id
EMIT CHANGES;
汇总结果定义为 t_review_summaries。
我会使用以下的t_reviews来定义t_review_summaries的汇总结果。
你会发现可以直观地写成类似以下的SQL代码。
ksql> CREATE TABLE t_review_summaries AS
SELECT game_id,
avg(score) AS avg_score,
count(review_id) AS cnt,
collect_list(comment) AS comments
FROM t_reviews
GROUP BY game_id
EMIT CHANGES;
定义一个包含游戏标题的 v_game_review_index
只需要加入即可。
ksql> CREATE TABLE v_game_review_index AS
SELECT r.game_id AS game_id,
g.title AS title,
r.cnt AS cnt,
r.avg_score AS avg_score,
r.comments AS comments
FROM t_review_summaries AS r
JOIN t_games AS g
ON r.game_id = g.game_id
EMIT CHANGES;
在这个实例视图上发出推送查询后,让我们在Rails控制台上尝试一些操作。
您应该能够确认更新结果是实时返回的。
ksql> SELECT * FROM v_game_review_index EMIT CHANGES;
+----------------+--------------------------------+------+--------------+--------------------------------------------------------+
|GAME_ID |TITLE |CNT |AVG_SCORE |COMMENTS |
+----------------+--------------------------------+------+--------------+--------------------------------------------------------+
|1 |DARK SOULS |3 |6.0 |[最高, Demons Souls よりもっさりしている, ヘルプミー] |
|2 |Bloodborne |3 |7.0 |[最高, 貞子, 血晶掘りが嫌だ] |
|3 |SEKIRO: SHADOWS DIE TWICE | 隻狼 |3 |6.0 |[最高, 難しすぎる……, 葦名一心が倒せない……] |
Press CTRL-C to interrupt
3. 装载部分
如果使用Kafka Connect(Sink Connector)将最后通过Transform定义的物化视图数据加载到Elasticsearch中,则任务完成。
下面的语句定义了将game_id作为Elasticsearch的_id进行同步的Kafka Connect。
ksql> CREATE SINK CONNECTOR `game-review-index-connect` WITH(
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='http://elasticsearch:9200',
'topics'='V_GAME_REVIEW_INDEX',
'key.ignore' = 'false',
'type.name' = '_doc',
'write.method' = 'upsert',
'errors.tolerance' = 'all',
'errors.deadletterqueue.topic.name' = 'V_GAME_REVIEW_INDEX_DEAD',
'errors.deadletterqueue.topic.replication.factor' = '-1'
);
验证行动
让我们尝试进行连续的操作验证。
访问Kibana,然后操作Index Patterns → 创建索引模式,以创建索引。
从初始数据中创建的索引应该已经生成了。
我认为可以通过访问Rails中的Game和Review模型并进行创建、更新来确认搜索索引也会以低延迟更新。
最后揭晓
实际上,这个 PoC 设计与事务性消息传递的问题域非常相似,它采用了微服务的实现模式——Transactional outbox。迄今为止,我在这本名为《数据导向应用程序设计》的参考书中多次看到「在微服务的上下文中」这样的表述,故意进行了区分,但实现微服务化的方法论却吸收了多种范式并不断发展,似乎并没有太多必要设立边界。
顺便说一句,这个 PoC 存在一个无法避免的与在生产环境中扩展 Kafka 相关的分区洗牌问题。我原本想详细解释这个问题,但仅仅解释一下就可能将本文的内容增加一倍左右,所以我打算将解决方案的验证和详细说明推迟到以后公开。
明天将是@gggk先生的“进行Rails应用性能调查”的活动。敬请期待!
使用更改数据捕获来优化ETL过程的方法
ksqlDB-用于流处理的事件流数据库
Kafka Connect – Confluent文档
支持的连接器 – Confluent平台5.5.0
数据驱动的应用程序设计 – O’Reilly第3部分
ksqlDB与嵌入式连接
事务性outbox – microservices.io
数据驱动的应用程序设计 – O’Reilly 12.2.2.4流处理器和服务等多个部分
ksqlDB如何实时生成材料化视图