【卡桑德拉之旅】Cassandra响应式编程风格~高级主题②查询结果的缓存
首先
东京的卡桑德拉日
今年,中国也将于2023年6月1日举办Cassandra Day。Cassandra Day去年曾在柏林、伦敦、阿姆斯特丹、河内、雅加达、休斯敦、圣塔克拉拉、西雅图和新加坡举办。
为了本次在东京的举办,我们将陆续发布关于Apache Cassandra的文章。
关于Apache Cassandra
如果用一句话来说,Apache Cassandra 是一个开源分布式数据库管理系统。
和其他分布式数据库管理系统一样,使用多个通用服务器来构建一个数据库(也可以仅使用一个服务器进行构建,用于开发等目的)。
在这里,我们将省略详细说明,将介绍感兴趣的人的任务交给官方网站和维基百科。
Cassandra 反应式编程风格
这篇稿件的内容是根据以下文件编写的。
请参阅本文作者对于Cassandra响应式编程风格的介绍和基本用法的相关文章。
深入的话题
查询结果的缓存
只能订阅一次ReactiveResultSet。这是出于设计上的故意决定。否则,当用户订阅同一查询的第二次时,可能会触发与用户意图不符的不同执行,与第一次订阅同一查询的执行不同。
我想计算表格列所有值的平均值和总和。最简单的方法是创建两个流,同时订阅它们。
由于试图避免进行两次查询的意图,以下代码无法正常工作。
// DON'T DO THIS
ReactiveResultSet rs = session.executeReactive("SELECT n FROM ...");
double avg = Flux.from(rs)
.map(row -> row.getLong(0))
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
// will fail with IllegalStateException
long sum = Flux.from(rs)
.map(row -> row.getLong(0))
.reduce(0L, (a, b) -> a + b)
.block();
因为 rs 已经被订阅,所以上述的第二个 Flux 在 onError 信号中立刻结束,并封装了 IllegalStateException。
避免两次查询表格,并且简单的方法是使用大多数响应式库提供的缓存来避免此限制。
Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
.map(row -> row.getLong(0))
.cache();
double avg = rs
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
long sum = rs
.reduce(0L, (a, b) -> a + b)
.block();
以上的代码可以正常运行。
操作员cache只会对ReactiveResultSet进行一次订阅,并缓存结果,然后将缓存的结果提供给下游订阅者。
只有当结果集较小并且可以装入内存时,才可能实现。
如果无法采用缓存策略,大多数反应式库还提供了一种在运行时将上游订阅多个订阅者的操作符进行多播的方式。
以下的例子可以用另一种方法进行重新写作。
Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
.map(row -> row.getLong(0))
.publish() // multicast upstream to all downstream subscribers
.autoConnect(2); // wait until two subscribers subscribe
long sum = rs
.reduce(0L, (a, b) -> a + b)
.block();
double avg = rs
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
在上面的示例中,publish操作符将所有的onNext信号广播给所有的订阅者。autoConnect(2)操作符告诉publish在订阅上游源(并触发实际的查询执行)之前等待获取两个订阅。
这种方法适用于大规模结果集,因为它不需要将结果缓存到内存中。