验证使用Kafka的微服务Saga模式
我是日立制作株式会社研发集团服务计算研究部的栗原。
我最近用Kafka构建了一个Saga模式的演示(PostgreSQL+Debezium+Kafka),并进行了操作验证,所以想要写一篇文章。
1.阿帕奇卡夫卡
Kafka是一种开源的分布式事件流平台。它具有高可扩展性和高可靠性,并且有许多使用案例和应用领域。在软件选择的时候,我认为它是一个经常被选中的软件之一。
Apache Kafka:https://kafka.apache.org/
2.微服务中的Saga模式。
使用微服务时,跨多个服务更新数据的设计变得更加困难。即使一个服务中的处理正常进行并且事务已经提交,如果在另一个服务中发生异常,整个处理都需要被撤销。
由于在一个事务中执行整个处理是很困难的,因此通过结果一致性来解决。实现这种解决方案的主要架构有TCC(尝试-确认/取消)模式和Saga模式这两种代表性方法。
TCC模式类似于RDB的两阶段提交,在首先确认每个服务是否可进行更新(试验)之后,在所有服务都可以更新的情况下进行更新(确认),否则取消(撤消)的方法。
而Saga模式是一种先在每个服务中确认处理,并在异常情况下通过补偿处理来撤销结果的方法。
我在这里产生了疑问,那就是关于在某项服务发生异常时如何保证结果的一致性。
TCC模式在获得所有相关服务的可更新响应之前,处理是不确定的,所以即使某个服务发生异常,我认为其缺点很小。
在一种Saga模式中,即使在中途的服务中发生异常,其他服务的处理也不会停止,因此需要确保数据更新和通知其他服务具有结果一致性。
此处可作为参考。
-
- Saga Orchestration for Microservices Using the Outbox Pattern: https://www.infoq.com/articles/saga-orchestration-outbox/
Chris Richardson (2019), Microservices Patterns, Manning Publications
RDB加上Debezium再加上Kafka。
使用微服务跨多个服务更新数据时,需要在每个服务中进行数据源(RDB)的更新和通知其他服务的处理。
为了确保数据更新和通知到其他服务的结果一致性,一种方法是使用分布式平台Debezium来捕获更新数据并构建Transactional Outbox模式。
Debezium:https://debezium.io/
在Transactional Outbox模式中,我们准备了一个用于业务数据的关系数据库表和一个用于通知的outbox表,并通过一个事务将两个表同时进行原子更新。Debezium会监测outbox表的变化,并将其发送和通知到Kafka的Topic。换句话说,应用程序只更新关系数据库表,消息发送由Debezium捕捉关系数据库的变化来完成,从而保证了结果的一致性。
4. 构建样本
我們將實際構建一個使用PostgreSQL+Debezium+Kafka的Saga模式的樣例並進行驗證。
幸運的是,由於這種配置的建立範例已在GitHub上公開,因此我們將使用它。
由於驗證Saga模式是目的,所以我們將省略解釋,但這個樣例是一個編排器類型的例子。
GitHub連結:https://github.com/debezium/debezium-examples/tree/master/saga
简单地解释一下样本的处理内容,前端部分是Order服务接受信用卡支付订单,后端部分包括Customer服务和Payment服务分别进行用户信息更新和付款信息更新。
環境構築的前提條件是已安裝了Maven和Docker Compose,並確認使用的Docker Compose版本為1.29.2。
在这个例子中,有两个重要的元素。
一个是用Java实现的业务逻辑库,另一个是Debezium的Kafka Connector。
(1) 德贝齐姆-夸克斯外发库
在使用Java实现的业务逻辑中,我们使用了名为debezium-quarkus-outbox的库。该库提供了向其他服务发送通知和接收通知的功能。
作为向其他服务发送通知的功能,当监视目标表(业务数据表)进行CUD(创建/更新/删除)操作时,我们会将该操作内容同时写入指定的outboxevent表中,该表是由Java程序指定的。
此外,还实现了通过Kafka接收其他服务的通知功能。在此示例中,我们分别从后端服务(模拟支付信息记录的Payment服务和模拟客户信用信息记录的Credit服务)接收了响应。
Debezium文档>集成>Outbox Quarkus扩展:
https://debezium.io/documentation/reference/integrations/outbox.html
这个库的配置文件是order-service/src/main/resources/application.properties。你可以在下面的链接中确认选项内容。
https://debezium.io/documentation/reference/integrations/outbox.html#_configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/orderdb
quarkus.datasource.username=orderuser
quarkus.datasource.password=orderpw
quarkus.hibernate-orm.log.sql=true
quarkus.hibernate-orm.database.generation=drop-and-create
# quarkus.hibernate-orm.database.generation=update
quarkus.hibernate-orm.database.default-schema=purchaseorder
quarkus.log.level=INFO
quarkus.log.min-level=INFO
quarkus.log.console.enable=true
quarkus.log.console.format=%d{HH:mm:ss} %-5p [%c] %s%e%n
mp.messaging.incoming.paymentresponse.connector=smallrye-kafka
mp.messaging.incoming.paymentresponse.topic=payment.response
mp.messaging.incoming.paymentresponse.bootstrap.servers=localhost:9092
mp.messaging.incoming.paymentresponse.group.id=order-service
mp.messaging.incoming.paymentresponse.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.paymentresponse.value.deserializer=io.debezium.examples.saga.order.facade.serdes.PaymentDeserializer
mp.messaging.incoming.paymentresponse.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
mp.messaging.incoming.creditresponse.connector=smallrye-kafka
mp.messaging.incoming.creditresponse.topic=credit-approval.response
mp.messaging.incoming.creditresponse.bootstrap.servers=localhost:9092
mp.messaging.incoming.creditresponse.group.id=order-service
mp.messaging.incoming.creditresponse.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.creditresponse.value.deserializer=io.debezium.examples.saga.order.facade.serdes.CreditDeserializer
mp.messaging.incoming.creditresponse.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
# Tracing configuration
quarkus.jaeger.service-name=order-service
为了查看outbox的数据,我在INSERT到outbox之后添加了以下内容,以确保数据不会被删除。
quarkus.debezium-outbox.remove-after-insert=false
Debezium连接器 (Debezium Connector)
我们将检查Debezium的Kafka Connector是如何工作的。由于这个配置与三个服务类似,我们只需要查看适用于Order服务的配置文件register-order-connector.json,如下所示。
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "order-db",
"database.port": "5432",
"database.user": "orderuser",
"database.password": "orderpw",
"database.dbname" : "orderdb",
"database.server.name": "dbserver1",
"schema.include.list": "purchaseorder",
"table.include.list" : "purchaseorder.outboxevent",
"tombstones.on.delete" : "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms" : "saga",
"transforms.saga.type" : "io.debezium.transforms.outbox.EventRouter",
"transforms.saga.route.topic.replacement" : "${routedByValue}.request",
"poll.interval.ms": "100"
}
以下是关于进行数据库轮询访问的信息和Kafka连接器的配置信息。值得注意的是,目标Kafka主题是${routedByValue},它根据outbox表的aggregateType变量指定,并根据插入到outbox的数据内容来决定要产生到哪个Kafka主题的消息。
5.验证
在验证过程中,将通过以下3个方面进行确认,以确保在实际案例中的应用运营中不存在任何问题。
(1) 查看外部可见的数据库的更新是否会被发送到Kafka中。
(2) 程序内部的运作方式。
(3) 监控行为的追踪机制。
(1) 外部可见的数据库更新动作会被发布到Kafka。
我们将验证在执行正常处理请求时的行为。
$ http POST http://localhost:8080/orders < requests/place-order.json
可以通过以下命令来查找发送到后端服务之一的Customer服务的主题的消息。
$ docker run --tty --rm --network saga-network debezium/tooling:1.1 kafkacat -b kafka:9092 -C -o beginning -q -f "{\"key\":%k, \"headers\":\"%h\"}\n%s\n" -t credit-approval.request
结果变成了这样。
{"key":57974d15-a2f5-49a7-b18a-71c636193bdd, "headers":"uber-trace-id=76d506f0a064b52d:edc3f04e96eac235:5c944ec803ae2b47:1,id=f9a78388-883a-45d7-b4f6-7d47f8c89187,uber-trace-id=76d506f0a064b52d:4f0ef2e07d4263d4:edc3f04e96eac235:1"}
{"order-id":3,"customer-id":456,"payment-due":4999,"credit-card-no":"xxxx-yyyy-dddd-aaaa","type":"REQUEST"}
可以确认”order-id”、”customer-id”、”payment-due”、”credit-card-no”、”type”等数据已经被生产到了Kafka中。
同时,我们可以使用类似的命令来查询被生产到Payment服务的话题中的消息。
$ docker run --tty --rm \
> --network saga-network \
> debezium/tooling:1.1 \
> kafkacat -b kafka:9092 -C -o beginning -q \
> -f "{\"key\":%k, \"headers\":\"%h\"}\n%s\n" \
> -t payment.request
结果如下。
{"key":57974d15-a2f5-49a7-b18a-71c636193bdd, "headers":"uber-trace-id=76d506f0a064b52d:59e20a8698815bdb:221d613d33da9539:1,id=89b3769c-2c57-4da7-ac1e-69972a273fed,uber-trace-id=76d506f0a064b52d:13b0ea2fef3c4c2d:59e20a8698815bdb:1"}
{"type":"REQUEST","order-id":3,"customer-id":456,"payment-due":4999,"credit-card-no":"xxxx-yyyy-dddd-aaaa"}
可以确认之前与 刚才相同的“order-id”、“customer-id”、“payment-due”、“credit-card-no”、“type”数据已经被生产到了Kafka。
接下来,我们将确认已经从默认设置更改并且已被设置为不会自动删除的Outbox表的状态如何。
orderuser@order-db:orderdb> select * from purchaseorder.outboxevent;
+--------------------------------------+-----------------+--------------------------------------+---------+----------------------------+------------------------------------
--------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
| id | aggregatetype | aggregateid | type | timestamp | payload
| tracingspancontext |
|--------------------------------------+-----------------+--------------------------------------+---------+----------------------------+------------------------------------
--------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------|
| f9a78388-883a-45d7-b4f6-7d47f8c89187 | credit-approval | 57974d15-a2f5-49a7-b18a-71c636193bdd | REQUEST | 2021-08-16 07:12:42.605371 | {"order-id":3,"customer-id":456,"pa
yment-due":4999,"credit-card-no":"xxxx-yyyy-dddd-aaaa","type":"REQUEST"} | #Mon Aug 16 07:12:42 GMT 2021 |
| | | | | |
| uber-trace-id=76d506f0a064b52d\:a7c822eda5e59186\:76d506f0a064b52d\:1 |
| 89b3769c-2c57-4da7-ac1e-69972a273fed | payment | 57974d15-a2f5-49a7-b18a-71c636193bdd | REQUEST | 2021-08-16 07:12:42.956002 | {"type":"REQUEST","order-id":3,"cus
tomer-id":456,"payment-due":4999,"credit-card-no":"xxxx-yyyy-dddd-aaaa"} | #Mon Aug 16 07:12:42 GMT 2021 |
| | | | | |
| uber-trace-id=76d506f0a064b52d\:ecbf39660c99535e\:a4e5fb89cc4291a6\:1 |
+--------------------------------------+-----------------+--------------------------------------+---------+----------------------------+------------------------------------
--------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
SELECT 6
Time: 0.018s
我们可以看到刚才Kafka中产生的消息内容已经被记录为负载。
进一步查看其他列的信息可以发现,有两个记录几乎同时被插入,并且aggregateType分别为”credit-approval”和”payment”。
通过这些数据,我们可以知道消息被产生并分别存入了credit-approval.request和payment.request这两个主题。
(2) 程序的内部运作
根据Java程序中的以下注释信息,可以追踪到确认有两种记录是如何在order-service/src/main/java/io/debezium/examples/saga/order/saga/OrderPlacementSaga.java中进行INSERT操作的。
@Saga(type="order-placement", stepIds = {CREDIT_APPROVAL, PAYMENT})
在`order-service/src/main/java/io/debezium/examples/saga/framework/SagaBase.java`中,根据注解中指定的”stepIds”,按顺序执行每个步骤的处理。根据这些信息,可以按顺序执行正常处理以及发生异常时的补偿事务的执行。
从文件夹的结构来看,这个framework文件夹下的处理似乎是独立于业务逻辑的实现。通过在执行CUD操作的同时使用这些类,可以实现在与业务数据相同的事务中执行对outboxevent表的INSERT操作的规范。
如果考虑使用这个库在服务之间发送请求和响应,按照框架下的文件夹中的处理内容来执行,我认为可以减少实施Saga架构所需的工作量。另一方面,如果不统一整个服务使用这个库,那么就需要对无法统一的实现部分进行与该库兼容的实施。此外,使用这个库时需要在业务逻辑中实现库的使用,这会增加学习成本和维护成本以适应库的规范变更。
(3) 监控行为的追踪机制
在这个样本中,我们集成了一个名为 Jaeger 的开源软件,可以用于分布式系统的追踪。通过它,我们可以追踪微服务之间的事务进度。
使用 Jaeger 查看追踪内容时,我们可以确认在 Order 服务接收订单后,Customer 服务和 Payment 服务对其进行处理,最终在两者都成功后,Order 服务的订单状态才会变为完成状态。我们可以将整个过程视为一个序列并进行确认。
由于中途记录了 Kafka Connector 的处理过程,因此即使组件增加,我们也可以轻松地调查故障发生的位置。
6. 总结和将来的验证
这次演示中,我们介绍了使用Kafka构建微服务的Saga模式的系统,并介绍了其操作验证结果。
若要在Saga模式中始终执行数据源更新和通知其他服务的操作,可以通过Transactional Outbox模式来解决,其中一种实现方法是使用RDB+Debezium+Kafka的组合。
我试着使用debezium-quarkus-outbox库构建了一个系统作为实现的示例,并确认了Saga模式的实现方式。
因此,我们发现尽管Java代码中需要为该库进行实现,但由于设置文件的存在,可以实现的范围很广,并且有可能减少工作量。
在使用这个库的同时,我觉得在通信的服务之间需要进行统一或至少是兼容的实现。我希望能够通过使用这个例子,进一步研究服务恢复操作和异常处理时的行为。