在Apache Camel应用程序中,将Apache Ignite用作分布式高速缓存服务器
首先
Apache Ignite是一种开源软件,可实现内存数据网格。具体而言,它可以充当内存数据库、键值存储、分布式消息系统和分布式缓存服务器等角色。
在本次情景中,我想将Apache Ignite作为分布式缓存服务器,从Apache Camel应用程序中使用。
在Apache Camel中,有以下四个组件用于访问缓存服务器。
※可能还有其他组件,但我所了解的只有这四个,并且我曾使用过Redis和Ignite这两个。
-
- Hazelcast
-
- Infinispan
-
- Redis
- Apache Ignite
在这其中,选择在这篇文章中选择Apache Ignite,是因为之前使用过,并且希望从Camel中使用它。
为了在Apache Camel中连接到Ignite,需要使用Camel Ignite组件。
本次将使用Ignite v2.6.0版本。有关环境搭建方法,请参考以下文章。
- インメモリデータグリッド Apache IgniteをJavaのクライアントからアクセスする
创建应用程序的描述 de
本次所制作的应用程序会按照下图的方式,每隔1秒将日期和时间的消息PUSH到Ignite的缓存中,并从Ignite获取相同的消息。另外,还会从Ignite的缓存中删除指定键的缓存。
那么,让我们创建一个将Ignite用作缓存服务器的应用程序。
添加一个用于使用Ignite的库。
对于Maven的情况,需要在pom.xml中添加以下的库。
camel-ignite是用于在Camel中处理Ignite的组件,${camel.version}用来指定所使用的Camel版本。
ignite-core是针对Ignite的客户端库。在这个设置中,我们将使用Spring XML,因此还需要添加ignite-spring。
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ignite</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>2.6.0</version>
</dependency>
组件的URI
使用camel-ignite组件作为缓存服务器的URI如下所示。
点火缓存:缓存名称
在上下文路径(cacheName)中指定目标缓存的名称。
创建应用程序
首先,进行Ignite的连接设置。
在Ignite中,使用IpFinder来指定连接目标Ignite节点。下面的TcpDiscoveryVmIpFinder使用固定的IP地址来指定连接目标的IpFinder,设置了两个服务器ignite1和ignite2。
IgniteConfiguration config = new IgniteConfiguration();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
Set<String> nodes = new HashSet<String>();
nodes.add("ignite1:47500..47509");
nodes.add("ignite2:47500..47509");
ipFinder.setAddresses(nodes);
config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
接下来,我们将创建一个用于将字符串PUT到Ignite的test01缓存的路由。
使用Java DSL编写的路由如下所示。
-
- キャッシュのキーは、IgniteConstants.IGNITE_CACHE_KEYヘッダにsetHeaderメソッドで指定します。
-
- キャッシュの操作は、URIのoperationオプションに”PUT”, “GET”, “REMOVE”などを指定します。
- PUT, GETするデータはメッセージのBODYに、setBodyメソッドで指定します。
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示
从Ignite的test01缓存中创建一个进行GET请求的路由。
from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
接下来是连接到Ignite的配置、将数据放入缓存、从缓存中删除、创建从缓存中获取数据的路由。以下是创建主要函数等的完整源代码。
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
IgniteConfiguration config = new IgniteConfiguration();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
Set<String> nodes = new HashSet<String>();
nodes.add("ignite1:47500..47509");
nodes.add("ignite2:47500..47509");
ipFinder.setAddresses(nodes);// 接続先のノードを指定
config.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
IgniteCacheComponent ignite = new IgniteCacheComponent();
ignite.setIgniteConfiguration(config);
context.addComponent("ignite-cache", ignite);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
运行已创建的Camel应用程序。
我将解释一下运行此应用程序时生成的日志。
以下的日志是由PUT和GET路由输出的日志。
PUT和GET都是每秒输出一次,且都输出日期时间字符串。
由于在不同的路由上执行,所以GET和PUT的顺序可能会交错。
[2019-02-01 21:41:04.258], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:04
[2019-02-01 21:41:05.230], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:05
[2019-02-01 21:41:05.230], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:05
[2019-02-01 21:41:06.229], [INFO ], ignite_producer_route, Camel (camel-1) thread #1 - timer://triggerPut, ignite_producer_route, KEY=testkey01, PUT = 2019-02-01 21:41:06
[2019-02-01 21:41:06.229], [INFO ], ignite_get_route, Camel (camel-1) thread #2 - timer://triggerGet, ignite_get_route, KEY=testkey01, GET = 2019-02-01 21:41:06
以下是 REMOVE 功能的路由输出的日志。
在第一个 GET 请求中,我们可以看到有时间戳的显示,但是在执行 REMOVE 后的第二个 GET 请求中,我们可以看到时间戳变为空。
[2019-02-01 21:41:04.263], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET = 2019-02-01 21:41:04
[2019-02-01 21:41:04.272], [INFO ], ignite_remove_route, Camel (camel-1) thread #3 - timer://triggerRemove, ignite_remove_route, KEY=testkey02, GET =
可以通过设置IgniteConstants.IGNITE_CACHE_OPERATION消息头来为每条消息设置不同的缓存操作,尽管在本次源代码中,我们设置了对缓存的操作使用Endpoint URI。因此,以下的源代码将得到相同的结果。
// (1)エンドポイントURIでキャッシュへの操作を設定した場合
.to("ignite-cache:test01?operation=PUT")
.to("ignite-cache:test01?operation=GET")
// (2)IgniteConstants.IGNITE_CACHE_OPERATIONメッセージヘッダーに設定した場合
.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("PUT"))
.to("ignite-cache:test01")
.setHeader(IgniteConstants.IGNITE_CACHE_OPERATION, simple("GET"))
.to("ignite-cache:test01")
Ignite服务器与应用程序之间的关系
这次连接了两台 Ignite 缓存服务器:ignite1 和 ignite2。
Camel 应用程序看起来像是客户端,但实际上它是以缓存服务器的形式运行。
换句话说,它加入了两台缓存服务器的集群,并且自身也是一个缓存服务器。这种配置在将应用程序和缓存服务器在同一台服务器上运行时非常高效。
当缓存服务器的规模变大时,可能需要将应用服务器和缓存服务器配置为不同的服务器。在这种情况下,将需要以客户端模式启动应用程序。
接下来,我们将尝试创建一个以客户端模式启动的应用程序。
创建一个应用程序作为Ignite服务器的客户端。
只需要添加以下一行代码,即可在客户端模式下启动应用程序。这样,根据本次配置,应用程序将作为两台缓存服务器的客户端进行启动。
Ignition.setClientMode(true);
因為這樣還不夠有趣,所以我想將設定改成Spring XML。在Ignite中,可以將設定檔案編寫如下所示。該檔案名稱為「default-config-cluster2server.xml」。這個檔案只是將先前創建的程式的資訊寫在XML中。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--
Alter configuration below as needed.
-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>ignite1:47500..47509</value>
<value>ignite2:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
要读取生成的XML文件,可以按照以下方式指定。
“default-config-cluster2server.xml”指定了文件路径。
“grid.cfg”是在XML中指定的BEAN的ID。
IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");
如果在客户端模式下设置,并且将XML用作配置的情况下,整个源代码如下所示。路由定义与前述源代码相同。
public static void main(String[] args) {
try {
CamelContext context = new DefaultCamelContext();
IgniteConfiguration config = Ignition.loadSpringBean("default-config-cluster2server.xml", "grid.cfg");
IgniteCacheComponent ignite = IgniteCacheComponent.fromConfiguration(config);
Ignition.setClientMode(true);
context.addComponent("ignite-cache", ignite);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("timer:triggerPut?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_put_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest01へPUT
.log("KEY=testkey01, PUT = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerGet?period=1000") // 1000ミリ秒毎に実行
.routeId("ignite_get_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey01")) // キャッシュのキーをtestkey01に指定
.to("ignite-cache:test01?operation=GET") // キャッシュtest01からGET
.log("KEY=testkey01, GET = ${body}"); // 取得したキャッシュの内容をログに表示
from("timer:triggerRemove?repeatCount=1") // 1回だけ実行
.routeId("ignite_remove_route")
.setHeader(IgniteConstants.IGNITE_CACHE_KEY, simple("testkey02")) // キャッシュのキーをtestkey02に指定
.setBody(simple("${date:now:yyyy-MM-dd HH:mm:ss}")) // メッセージのBODYに現在の日時を設定
.to("ignite-cache:test01?operation=PUT") // キャッシュtest02へPUT
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}") // 取得したキャッシュの内容をログに表示
.to("ignite-cache:test01?operation=REMOVE") // キャッシュtest02からREMOVE
.to("ignite-cache:test01?operation=GET")
.log("KEY=testkey02, GET = ${body}"); // 取得したキャッシュの内容をログに表示(REMOVE後なので空になっている)
}
});
context.start();
Thread.sleep(10000);
context.stop();
} catch (Exception e) {
e.printStackTrace();
}
在IgniteVisor中,使用top命令来检查集群的状态。
visor> top
Hosts: 3
+=================================================================================================================================+
| Int./Ext. IPs | Node ID8(@) | Node Type | OS | CPUs | MACs | CPU Load |
+=================================================================================================================================+
| 0:0:0:0:0:0:0:1%lo | 1: 0D2C0B3D(@n0) | Server | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1 | 08:00:27:76:F3:CF | 1.00 % |
| 127.0.0.1 | | | | | | |
| 192.168.20.72 | | | | | | |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 192.168.100.254 | 1: 8A498968(@n1) | Server | Linux amd64 3.10.0-693.2.2.el7.x86_64 | 1 | 02:42:2F:F1:6A:FD | 0.67 % |
| 192.168.20.71 | | | | | 08:00:27:E3:4A:44 | |
| 127.0.0.1 | | | | | | |
+--------------------+------------------+-----------+---------------------------------------+------+-------------------+----------+
| 0:0:0:0:0:0:0:1 | 1: F751C80C(@n2) | Client | Windows 10 amd64 10.0 | 4 | 00:1B:DC:F5:B4:36 | 0.00 % |
| 192.168.20.197 | | | | | 0A:00:27:00:00:0F | |
| 127.0.0.1 | | | | | 0A:00:27:00:00:0B | |
+---------------------------------------------------------------------------------------------------------------------------------+
Summary:
+--------------------------------------+
| Active | true |
| Total hosts | 3 |
| Total nodes | 3 |
| Total CPUs | 6 |
| Avg. CPU load | 0.56 % |
| Avg. free heap | 80.00 % |
| Avg. Up time | 09:44:38 |
| Snapshot time | 2019-02-02 02:51:35 |
+--------------------------------------+
然后,您可以看到上述节点的”Node Type”是”Client”。这是我们创建的应用程序,表示它在客户端模式下运行。
如果在客户端模式下运行,将会参与到集群中,但本地不会缓存数据。
camel-ignite组件的主要属性
最后我们会对camel-ignite组件的主要属性进行说明。
除了下表之外,我们还有其他的属性,请参考官方网页获取更详细的信息。
- Igniteコンポーネント – 公式サイト
IgniteCacheOperation
请参考
-
- Igniteコンポーネント – 公式サイト
-
- IgniteCacheコンポーネント – 公式サイト
-
- Camel Igniteコンポーネント – 公式サイト(旧)
- Ignite – 公式サイト