【卡桑德拉之路】Cassandra 响应式编程风格~高级主题③ 从查询失败到重试
首先
东京的卡桑德拉日
今年2023年6月1日,Cassandra Day将在日本举办。去年,Cassandra Day在柏林、伦敦、阿姆斯特丹、河内、雅加达、休斯顿、圣塔克拉拉、西雅图和新加坡也举办过。
我們將在東京舉辦的活動前發表有關Apache Cassandra的文章。
关于Apache Cassandra
Apache Cassandra 是一个开源的分布式数据库管理系统,简而言之。
和其他分布式数据库管理系统一样,可以使用多个通用服务器来构建一个数据库(也可以仅使用一个服务器来满足开发等目的)。
在这里,我们将跳过详细的解释,将介绍的任务交给对此感兴趣的人,可以参考官方网站或维基百科。
Cassandra反应式编程风格
本文的内容是基于以下文件。
请参阅本文作者所写的以下文章,介绍了Cassandra反应式编程的风格和基本用法。
高级话题
从查询失败中重新尝试
在响应式编程中,如果查询失败,会触发onError信号,订阅会立即终止,并且后续的查询可能不会执行。
如果这种行为不被期望,你可以模仿故障安全系统的运作。通常情况下,使用onErrorReturn或者onErrorResume等运算符来实现。
每当执行失败时,将堆栈跟踪输出到标准错误。由于存在onErrorResume运算符,错误将被忽略,流程将会继续执行。
Flux<Statement<?>> stmts = ...;
stmts.flatMap(
statement ->
Flux.from(session.executeReactive(statement))
.doOnError(Throwable::printStackTrace)
.onErrorResume(error -> Mono.empty()))
.blockLast();
看一眼就能明白,上述的实现过于简单了。
以下的例子是前一個例子的擴展。每次失敗的執行,如果錯誤是UnavailableException,將進行最多3次的重試,如果重試後查詢仍未成功,則將訊息記錄在日誌中。最後,將收集所有錯誤,並將失敗的查詢總數輸出到控制台。
Flux<Statement<?>> statements = ...;
long failed = statements.flatMap(
stmt ->
Flux.defer(() -> session.executeReactive(stmt))
// retry at most 3 times on Unavailable
.retry(3, UnavailableException.class::isInstance)
// handle errors
.doOnError(
error -> {
System.err.println("Statement failed: " + stmt);
error.printStackTrace();
})
// Collect errors and discard all returned rows
.ignoreElements()
.cast(Long.class)
.onErrorReturn(1L))
.sum()
.block();
System.out.println("Total failed queries: " + failed);
在上述的例子中,我们使用Flux.defer()来包装对session.executeReactive()的调用。
这是因为驱动程序始终需要创建只有单个订阅的发布者,所以这是必要的。
如果不使用defer运算符,支持仅限于这种单个订阅的发布者将与retry等运算符不兼容。这是因为这些运算符可能会多次向上游发布者订阅,并且在驱动程序抛出异常的情况下。这正是defer运算符设计的目的。对defer运算符的每个订阅将触发对session.executeReactive()的单独调用,并在会话重新执行和重试查询时返回新的发布者。