Embulk-output-elasticsearch和Elasticsearch的注意事项
我是佐藤(@hiroysato),我正在关注Fluentd的批处理版本Embulk(エンバルク)的进展情况,担心它能否在下周的Meetup前达到100个。
这篇文章是第11天的Embulk Advent Calendar的文章。
2017年6月29日 补充
embulk-output-elasticsearch 4ではメモリリークがあるらしく、大きなデータをロードするとOutOfMemoryになる模様(参考)
2017/5/19 添加附注
目前情况
-
- Amazon Elasticsearch Serviceを使う場合embulk-output-elasticsearch_using_url を利用 参考
elasticsearch1~5を利用する場合: embulk-output-elasticsearch 4.0以降を利用
AzureやGCPは情報をおまちしております。
2017年3月29日 更新
- elasticsearch5に対応した、embulk-output-elasticsearch0.4.0がリリースされました。
2017年1月30日补充
似乎可以使用下列插件来连接到elasticsearch 5.0。
-
- embulk-output-elasticsearch_ruby
embulk-output-elasticsearch5
2016/11/17更新
embulk-output-elasticsearch v0.2.X不支持elasticsearch-5.0.0。
似乎可以使用embulk-output-elasticsearch_ruby作为替代。
2016年1月27日 补充
请使用与您使用的elasticsearch版本相匹配的适当插件,于2016年1月26日发布了embulk-output-elasticsearch v0.2.0。
1. 在Java升级到2.0时,由于API发生了重大变化,因此与1.X版本不兼容,失去了向下兼容性。
2016/03/08 补充加入
embulk-output-elasticsearch对AWS ES不支持。
如@nora96o @matetsu所说,embulk-output-elasticsearch插件使用的是Transport Client,但是无法在AWS ES上使用,所以无法连接。
以下是一份古老的摘要。
总结
-
- embulk-output-elasticsearch 0.1.8はElasticsearch 2.X系に接続できません。
elasticsearchは1.Xを使いましょう
もし諸般の事情でelasticsearch v2.Xを使いたい場合は下記の方法を試してください。
リリースされたらこのページの情報が変わります。ストックして変更を追跡してください。
首先
您是否遇到了使用Embulk将数据导入Elasticsearch时出现问题的困扰?可能是因为您使用的是Elasticsearch v2.X版本,导致无法成功投入数据。
到2015年12月11日,embulk-output-elasticsearch 0.1.8尚不支持elasticsearch v2.X。
如果使用embulk-output-elasticsearch,请使用Elasticsearch 1.7.3。
希望能有更少的人说“我试过使用Embulk,但它不工作”这样的话。
连接到elasticsearch v2.X时出现的错误。
如果使用的elasticsearch服务器是v2.X版本,在执行embulk run命令时会出现以下错误。
由于embulk-output-elasticsearch的当前版本仍未支持v2.X,所以这就是原因。
org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:176) ~[na:na]
at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:128) ~[na:na]
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[na:na]
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[na:na]
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[na:na]
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[na:na]
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[na:na]
at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[na:na]
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_31]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_31]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: java.io.StreamCorruptedException: Unsupported version: 1
at org.elasticsearch.common.io.ThrowableObjectInputStream.readStreamHeader(ThrowableObjectInputStream.java:46) ~[na:na]
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) ~[na:1.8.0_31]
at org.elasticsearch.common.io.ThrowableObjectInputStream.<init>(ThrowableObjectInputStream.java:38) ~[na:na]
at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:173) ~[na:na]
... 23 common frames omitted
使用独立构建的embulk-output-elasticsearch
如果您一定要尝试elasticsearch v2.0版本,那么请尝试使用在github上提供的embulk-output-elasticsearch。根据github上的源码看,它似乎支持v2.0版本。(但可能存在一些问题,如果遇到任何问题,请在Isshue上报告)
你可以编译并试用这个东西。(需要安装JDK。)
你可以使用下面的命令来创建一个支持elasticsearch v2.X的embulk-output-elasticsearch。
git clone https://github.com/muga/embulk-output-elasticsearch.git
cd embulk-output-elasticsearch
./gradlew gem
验证
将elasticsearch引入系统中。
将elasticsearch引入。
实施elasticsearch。
导入elasticsearch。
wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.1.0/elasticsearch-2.1.0.zip
unzip elasticsearch-2.1.0.zip
cd elasticsearch-2.1.0
./bin/elasticsearch
创建样本数据
embulk example sample
embulk guess sample/example.yml -o config.yml
修改设置文件
更改「out」的部分。
in:
type: file
path_prefix: /private/tmp/sample/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
trim_if_not_quoted: false
skip_header_lines: 1
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
#out: {type: stdout}
out:
type: elasticsearch
index: embulk
index_type: embulk
nodes:
- host: localhost
预览
embulk preview config.yml
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+
数据输入
指定独自构建的embulk-output-elasticsearch路径并运行embulk run。
embulk run -I /path/to/embulk-output-elasticsearch/lib config.yml
如果要安装已经构建的软件包,您可以进行以下操作。
可以使用下述方法安装(请注意,版本将变为0.1.8)。
./gradlew gem
embulk gem install -l pkg/embulk-output-elasticsearch-0.1.8.gem