Apache Camel和反应式编程
总结
-
- リアクティブプログラミングとは
-
- Camel + Reactive Streams
- Camel + Vert.x
响应式编程是指一种编程范式,其中数据流通过异步的方式流动,并且可以在不同的时间点被处理或操作。
-
- リアクティブシステムの特徴
即応性(Responsive)― 即時に応答を返す
耐障害性(Resilient)― 障害があっても応答する
弾力性(Elastic)― 高負荷であっても応答する
メッセージ駆動(Message Driven)― 疎結合な非同期通信に基づく
背压 .
如果一个组件不能跟上整体的进度,系统作为一个整体需要有一些处理手段。过载状态下的组件不能彻底崩溃或丧失消息的控制。如果处理无法跟上,并且不能崩溃,那么组件应该向上游组件通知自己处于过载状态并请求减轻负载。这种被称为反压(back-pressure)的机制是在过载情况下系统不会崩溃并能持续缓慢响应的重要反馈机制。反压机制可以传递给用户,这样处理响应性会降低,但系统的抗故障性在负载下得到保证。此外,系统可以利用这些信息来重新调配其他资源,促进负载均衡。
反应式流的示例
-
- JDK 9のjava.util.concurrent.Flowと1:1に対応
- back pressureのサポート
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
// Hello world
Flowable.just("Hello World").subscribe(System.out::println);
// Publisher/Subscriber example
Publisher<Integer> publisher = Flowable.just(1, 2, 3, 4, 5);
Flowable.fromPublisher(publisher)
.map(i -> i*i)
.doOnNext(System.out::println)
.subscribe();
骆驼 + 响应式流
-
- CamelルートとReactive Streamsフローとの双方向の連携を可能にする
reactive-streams:xxxxx[?options]
以下のリアクティブフレームワーク実装に対応
Reactor Core
RxJava
Back pressureにも対応
骆驼(酒吧)→ 响应式流(订阅者)
- Camelのプロデューサーエンドポイント(to)からReactive Streamsのフローへ
CamelContext camel = new DefaultCamelContext();
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
// Camel (pub) → Reactive Streams (sub)
//Publisher<Integer> publisher = rsCamel.from("seda:input", Integer.class);
Publisher<Integer> publisher = rsCamel.fromStream("input", Integer.class);
Flowable.fromPublisher(publisher)
.map(i -> i*i)
.doOnNext(System.out::println)
.subscribe();
FluentProducerTemplate template = camel.createFluentProducerTemplate();
IntStream.rangeClosed(1, 5).forEach(i ->
//template.withBody(i).to("seda:input").send());
template.withBody(i).to("reactive-streams:input").send());
响应式流(发布)→ 骆驼(订阅)
- Reactive StreamsのフローからCamelのコンシューマーエンドポイント(from)へ
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
public void configure() {
from("reactive-streams:input")
.to("stream:out");
}
});
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
// Reactive Streams (pub) → Camel (sub)
Flowable.just(1, 2, 3, 4, 5)
.map(i -> i*i)
.subscribe(rsCamel.streamSubscriber("input", Integer.class));
使用反应式流实现Camel路由
- ルートとフローのミックス実装もできる
CamelContext camel = new DefaultCamelContext();
camel.addRoutes(new RouteBuilder() {
public void configure() {
from("reactive-streams:input")
.to("stream:out");
from("direct:output")
.log("Done: ${body}");
}
});
CamelReactiveStreamsService rsCamel = CamelReactiveStreams.get(camel);
camel.start();
// Camel route & Reactive Streams flow
Flowable.fromPublisher(rsCamel.from("seda:input"))
.map(e -> {
int i = e.getMessage().getBody(Integer.class);
e.getMessage().setBody(i * i);
return e;
})
.doOnNext(e -> rsCamel.to("stream:out", e))
.subscribe(rsCamel.subscriber("direct:output"));
FluentProducerTemplate template = camel.createFluentProducerTemplate();
IntStream.rangeClosed(1, 5).forEach(i ->
template.withBody(i).to("seda:input").send());
駱駝的背壓支持
- 処理中(in-flight)エクスチェンジの数を制限することでback pressureを実現できる
制片人方
ThrottlingInflightRoutePolicyポリシー
スロットル中はルートが一時停止する
ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
policy.setMaxInflightExchanges(10);
from("jms:queue1")
.routePolicy(policy)
.to("reactive-streams:flow1");
backpressureStrategyオプション
BUFFER, LATEST, OLDEST
from("jms:queue2")
.to("reactive-streams:flow2?backpressureStrategy=LATEST");
消费者方
maxInflightExchangesオプション
from("reactive-streams:numbers?maxInflightExchanges=10")
.to("direct:endpoint");
骆驼 + Vert.x
-
- Vert.xのEventBusとイベントのやり取りを可能にする
vertx:xxxxx[?options]
Vert.x是什么
-
- JVM版のNode.jsのようなもの
Verticle(= Actor)とイベントバス
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
class HttpServer {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new LoggingVerticle());
EventBus eventBus = vertx.eventBus();
vertx.createHttpServer()
.requestHandler(req -> {
eventBus.send("logging", "Got request: " + req.method() + " " + req.uri());
req.response().end("Hello!");
})
.listen(8888);
// ... クローズ処理
}
}
class LoggingVerticle extends AbstractVerticle {
public void start() {
vertx.eventBus().<String>consumer("logging",
message -> log.info(message.body()));
}
}
骆驼 Vert.x
- CamelからVert.xのイベントバス、Vert.xイベントバスからCamelへの双方向のメッセージングが可能
class CamelHttpServer {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new LoggingVerticle());
EventBus eventBus = vertx.eventBus();
vertx.createHttpServer()
.requestHandler(req -> {
eventBus.send("camel-logging", "Got request: " + req.method() + " " + req.uri());
req.response().end("Hello!");
})
.listen(8888);
CamelContext camel = new DefaultCamelContext();
camel.getComponent("vertx", VertxComponent.class).setVertx(vertx);
camel.addRoutes(new RouteBuilder() {
public void configure() {
// Vert.x → Camel
from("vertx:camel-logging")
.log("body = ${body}")
.to("direct:logging");
// Camel → Vert.x
from("direct:logging")
.to("vertx:logging");
}
});
camel.start();
// ... クローズ処理
}
}
class LoggingVerticle extends AbstractVerticle {
public void start() {
vertx.eventBus().<String>consumer("logging",
message -> log.info(message.body()));
}
}
Vert.x相关组件
コンポーネント説明camel-vertx-httpVert.x Web ClientによるHTTPエンドポイント実装(Producerのみ)camel-vertx-kafkaVert.x Kafka ClientによるKafkaとの接続camel-vertx-websocketVert.x WebSocket機能によるWebSocketエンドポイントの実装Vert.x Camel BridgeCamelでなくVert.x側からのCamelへのブリッジ(ちなみに作者はCamelのClaus)
文獻參考
Camel in Action, 2nd Edition — Chapter 20: Reactive Camel