我尝试使用Embulk将MySQL中的数据加载到ElasticSearch中

将存储在MySQL中的时间序列数据导入到elasticsearch中,并在kibana中进行可视化时的记录。

当在Elasticsearch MySQL进行搜索时,会出现使用JDBC river插件的相关文章,然而看起来这个river插件似乎已被弃用。

于是我尝试了使用Embulk,结果超出预期地快速完成了,所以做下了备忘录。

大致外观

system.png

像这样做的事情。

安装 Elasticsearch 和 Kibana

只需下载并运行程序脚本,这两个软件即可立即运行,详细信息请参考官方网站。Elasticsearch运行在Java上,因此需要安装Java。而Kibana在Node上运行,但自带了Node,所以不必担心。

下载网站

Elasticsearch: 弹性搜索
Kibana: 基本插件

Embulk的安装设置

Embulk是一種像你一樣的工具,可以在不同類型的數據存儲之間方便地導出和導入數據。
與Fluentd一樣,Embulk也有各種不同數據存儲的插件可供使用,通過利用這些插件,可以在多種不同組合的數據存儲之間移動數據。

装载数据

安装和设置参考了以下步骤。非常简单。

从CSV文件定期批量加载数据到Elasticsearch和Kibana 4。

# Embulkをグローバルにインストール
sudo wget http://dl.embulk.org/embulk-latest.jar -O /usr/local/bin/embulk
sudo chmod +x /usr/local/bin/embulk

在Embulk中安装插件。

在上述的情况下,需要安装用于处理mysql和elasticsearch的插件。Embulk本身有一个命令用于管理插件,可以使用它来安装插件。


# embulk pluginの確認
# embulk-input-mysql embulk-output-elasticsearchは最初は入ってない.
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4

*** LOCAL GEMS ***
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)

# mysqlのinputプラグイン、elasticsearchのoutputプラグインをインストール.
$ embulk gem install embulk-input-mysql embulk-output-elasticsearch  
...

# 確認
$ embulk gem list
2015-09-06 23:51:06.933 +0900: Embulk v0.7.4

*** LOCAL GEMS ***

embulk-input-mysql (0.6.0)
embulk-output-elasticsearch (0.1.8)
jar-dependencies (0.1.15)
jruby-openssl (0.9.7 java)
json (1.8.0 java)
minitest (5.4.1)
power_assert (0.2.3)
psych (2.0.14.pre1 java)
rake (10.1.0)
rdoc (4.1.0)
test-unit (3.0.3)

执行批量加载

执行Embulk需要编写Input插件和Output插件的配置文件。
编写以下类似的config文件。我们不需要对数据格式进行转换或更改列名,而是将所有数据直接导入elasticsearch中。

in:
    type: mysql
    host: $mysql_host
    user: $user
    password: $password
    database: $database
    query: |
        SELECT * FROM $table

out:
    type: elasticsearch
    index: test
    index_type: embulk
    nodes:
    - host: $elasticsearch_host

把这个文件加载到Embulk中并执行,就可以了。


# ロードされるデータの確認.
$ embulk preview config.yml

# load 
$ embulk run config.yml 
2015-09-07 00:07:17.785 +0900: Embulk v0.7.4
2015-09-07 00:07:21.932 +0900 [INFO] (transaction): Loaded plugin embulk-input-mysql (0.6.0)
2015-09-07 00:07:22.825 +0900 [INFO] (transaction): Loaded plugin embulk-output-elasticsearch (0.1.8)
2015-09-07 00:07:22.959 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:23.846 +0900 [INFO] (transaction): [Ms. MODOK] loaded [], sites []
2015-09-07 00:07:25.364 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-09-07 00:07:25.388 +0900 [INFO] (task-0000): [Arkady Rossovich] loaded [], sites []
2015-09-07 00:07:25.625 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-09-07 00:07:25.650 +0900 [INFO] (task-0000): SQL: SELECT * FROM test

2015-09-07 00:07:25.664 +0900 [INFO] (task-0000): > 0.01 seconds
2015-09-07 00:07:26.065 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-09-07 00:07:26.174 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-09-07 00:07:26.188 +0900 [INFO] (task-0000): Execute 1000 bulk actions
2015-09-07 00:07:26.241 +0900 [INFO] (task-0000): Execute 71 bulk actions
2015-09-07 00:07:26.379 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#6]{New I/O worker #15}): 71 bulk actions succeeded
2015-09-07 00:07:26.841 +0900 [INFO] (elasticsearch[Arkady Rossovich][transport_client_worker][T#5]{New I/O worker #14}): 1000 bulk actions succeeded
2015-09-07 00:07:26.954 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-09-07 00:07:27.047 +0900 [INFO] (main): Committed.
2015-09-07 00:07:27.048 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

用以下的命令确认Elasticsearch已生成映射。

curl -XGET 'http://$elastisearch_host/test?pretty'

只需要一个选项:
接下来你只需要访问Kibana并进行某些操作,就可以在漂亮的图形上查看MySQL中的时间序列数据了!

Visualize_-_Kibana_4.png

记录:据说Elasticsearch插件可以自动设置Elasticsearch对象的ID(_id),所以如果执行上述的embulk run两次,两次都会成功,导致MySQL中的相同数据被重复导入。在批处理等执行时需要注意。

广告
将在 10 秒后关闭
bannerAds