我尝试使用Embulk将MySQL中的数据加载到ElasticSearch中
将存储在MySQL中的时间序列数据导入到elasticsearch中,并在kibana中进行可视化时的记录。
当在Elasticsearch MySQL进行搜索时,会出现使用JDBC river插件的相关文章,然而看起来这个river插件似乎已被弃用。
于是我尝试了使用Embulk,结果超出预期地快速完成了,所以做下了备忘录。
大致外观
像这样做的事情。
安装 Elasticsearch 和 Kibana
只需下载并运行程序脚本,这两个软件即可立即运行,详细信息请参考官方网站。Elasticsearch运行在Java上,因此需要安装Java。而Kibana在Node上运行,但自带了Node,所以不必担心。
下载网站
Elasticsearch: 弹性搜索
Kibana: 基本插件
Embulk的安装设置
Embulk是一種像你一樣的工具,可以在不同類型的數據存儲之間方便地導出和導入數據。
與Fluentd一樣,Embulk也有各種不同數據存儲的插件可供使用,通過利用這些插件,可以在多種不同組合的數據存儲之間移動數據。
装载数据
安装和设置参考了以下步骤。非常简单。
从CSV文件定期批量加载数据到Elasticsearch和Kibana 4。
# Embulkをグローバルにインストール
sudo wget http://dl.embulk.org/embulk-latest.jar -O /usr/local/bin/embulk
sudo chmod +x /usr/local/bin/embulk
在Embulk中安装插件。
在上述的情况下,需要安装用于处理mysql和elasticsearch的插件。Embulk本身有一个命令用于管理插件,可以使用它来安装插件。
# embulk pluginの確認
# embulk-input-mysql embulk-output-elasticsearchは最初は入ってない.
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4
*** LOCAL GEMS ***
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)
# mysqlのinputプラグイン、elasticsearchのoutputプラグインをインストール.
$ embulk gem install embulk-input-mysql embulk-output-elasticsearch
...
# 確認
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4
*** LOCAL GEMS ***
embulk-input-mysql (0.6.0)
embulk-output-elasticsearch (0.1.8)
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)
执行批量加载
执行Embulk需要编写Input插件和Output插件的配置文件。
编写以下类似的config文件。我们不需要对数据格式进行转换或更改列名,而是将所有数据直接导入elasticsearch中。
in:
type: mysql
host: $mysql_host
user: $user
password: $password
database: $database
query: |
SELECT * FROM $table
out:
type: elasticsearch
index: test
index_type: embulk
nodes:
- host: $elasticsearch_host
把这个文件加载到Embulk中并执行,就可以了。
# ロードされるデータの確認.
$ embulk preview config.yml
# load
$ embulk run config.yml
2015-09-07 00:07:17.785 +0900: Embulk v0.7.4
2015-09-07 00:07:21.932 +0900 [INFO] (transaction): Loaded plugin embulk-input-mysql (0.6.0)
2015-09-07 00:07:22.825 +0900 [INFO] (transaction): Loaded plugin embulk-output-elasticsearch (0.1.8)
2015-09-07 00:07:22.959 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:23.846 +0900 [INFO] (transaction): [Ms. MODOK] loaded [], sites []
2015-09-07 00:07:25.364 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-09-07 00:07:25.388 +0900 [INFO] (task-0000): [Arkady Rossovich] loaded [], sites []
2015-09-07 00:07:25.625 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:25.650 +0900 [INFO] (task-0000): SQL: SELECT * FROM test
2015-09-07 00:07:25.664 +0900 [INFO] (task-0000): > 0.01 seconds
2015-09-07 00:07:26.065 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-09-07 00:07:26.174 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-09-07 00:07:26.188 +0900 [INFO] (task-0000): Execute 1000 bulk actions
2015-09-07 00:07:26.241 +0900 [INFO] (task-0000): Execute 71 bulk actions
2015-09-07 00:07:26.379 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#6]{New I/O worker #15}): 71 bulk actions succeeded
2015-09-07 00:07:26.841 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#5]{New I/O worker #14}): 1000 bulk actions succeeded
2015-09-07 00:07:26.954 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-09-07 00:07:27.047 +0900 [INFO] (main): Committed.
2015-09-07 00:07:27.048 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
用以下的命令确认Elasticsearch已生成映射。
curl -XGET 'http://$elastisearch_host/test?pretty'
只需要一个选项:
接下来你只需要访问Kibana并进行某些操作,就可以在漂亮的图形上查看MySQL中的时间序列数据了!
记录:据说Elasticsearch插件可以自动设置Elasticsearch对象的ID(_id),所以如果执行上述的embulk run两次,两次都会成功,导致MySQL中的相同数据被重复导入。在批处理等执行时需要注意。