【卡桑德拉之路】Cassandra 响应式编程风格~高级主题③ 从查询失败到重试

首先

东京的卡桑德拉日

今年2023年6月1日,Cassandra Day将在日本举办。去年,Cassandra Day在柏林、伦敦、阿姆斯特丹、河内、雅加达、休斯顿、圣塔克拉拉、西雅图和新加坡也举办过。

我們將在東京舉辦的活動前發表有關Apache Cassandra的文章。

image.png

关于Apache Cassandra

Apache Cassandra 是一个开源的分布式数据库管理系统,简而言之。

和其他分布式数据库管理系统一样,可以使用多个通用服务器来构建一个数据库(也可以仅使用一个服务器来满足开发等目的)。

在这里,我们将跳过详细的解释,将介绍的任务交给对此感兴趣的人,可以参考官方网站或维基百科。

 

Cassandra反应式编程风格

本文的内容是基于以下文件。

 

请参阅本文作者所写的以下文章,介绍了Cassandra反应式编程的风格和基本用法。

 

高级话题

从查询失败中重新尝试

在响应式编程中,如果查询失败,会触发onError信号,订阅会立即终止,并且后续的查询可能不会执行。

如果这种行为不被期望,你可以模仿故障安全系统的运作。通常情况下,使用onErrorReturn或者onErrorResume等运算符来实现。

每当执行失败时,将堆栈跟踪输出到标准错误。由于存在onErrorResume运算符,错误将被忽略,流程将会继续执行。

Flux<Statement<?>> stmts = ...;
stmts.flatMap(
    statement ->
        Flux.from(session.executeReactive(statement))
            .doOnError(Throwable::printStackTrace)
            .onErrorResume(error -> Mono.empty()))
    .blockLast();

看一眼就能明白,上述的实现过于简单了。

以下的例子是前一個例子的擴展。每次失敗的執行,如果錯誤是UnavailableException,將進行最多3次的重試,如果重試後查詢仍未成功,則將訊息記錄在日誌中。最後,將收集所有錯誤,並將失敗的查詢總數輸出到控制台。

Flux<Statement<?>> statements = ...;
long failed = statements.flatMap(
    stmt ->
        Flux.defer(() -> session.executeReactive(stmt))
            // retry at most 3 times on Unavailable
            .retry(3, UnavailableException.class::isInstance)
            // handle errors
            .doOnError(
                error -> {
                  System.err.println("Statement failed: " + stmt);
                  error.printStackTrace();
                })
            // Collect errors and discard all returned rows
            .ignoreElements()
            .cast(Long.class)
            .onErrorReturn(1L))
    .sum()
    .block();
System.out.println("Total failed queries: " + failed);

在上述的例子中,我们使用Flux.defer()来包装对session.executeReactive()的调用。
这是因为驱动程序始终需要创建只有单个订阅的发布者,所以这是必要的。

如果不使用defer运算符,支持仅限于这种单个订阅的发布者将与retry等运算符不兼容。这是因为这些运算符可能会多次向上游发布者订阅,并且在驱动程序抛出异常的情况下。这正是defer运算符设计的目的。对defer运算符的每个订阅将触发对session.executeReactive()的单独调用,并在会话重新执行和重试查询时返回新的发布者。

广告
将在 10 秒后关闭
bannerAds