Embulk-output-elasticsearch和Elasticsearch的注意事项

我是佐藤(@hiroysato),我正在关注Fluentd的批处理版本Embulk(エンバルク)的进展情况,担心它能否在下周的Meetup前达到100个。

这篇文章是第11天的Embulk Advent Calendar的文章。

2017年6月29日 补充

embulk-output-elasticsearch 4ではメモリリークがあるらしく、大きなデータをロードするとOutOfMemoryになる模様(参考)

2017/5/19 添加附注

目前情况

    • Amazon Elasticsearch Serviceを使う場合embulk-output-elasticsearch_using_url を利用 参考

elasticsearch1~5を利用する場合: embulk-output-elasticsearch 4.0以降を利用
AzureやGCPは情報をおまちしております。

2017年3月29日 更新

    elasticsearch5に対応した、embulk-output-elasticsearch0.4.0がリリースされました。

2017年1月30日补充

似乎可以使用下列插件来连接到elasticsearch 5.0。

    • embulk-output-elasticsearch_ruby

embulk-output-elasticsearch5

2016/11/17更新

embulk-output-elasticsearch v0.2.X不支持elasticsearch-5.0.0。
似乎可以使用embulk-output-elasticsearch_ruby作为替代。

2016年1月27日 补充

请使用与您使用的elasticsearch版本相匹配的适当插件,于2016年1月26日发布了embulk-output-elasticsearch v0.2.0。

Elasticsearchembulkプラグイン2.Xv0.2.0以降1.Xv0.1.8

1. 在Java升级到2.0时,由于API发生了重大变化,因此与1.X版本不兼容,失去了向下兼容性。

2016/03/08 补充加入

embulk-output-elasticsearch对AWS ES不支持。

如@nora96o @matetsu所说,embulk-output-elasticsearch插件使用的是Transport Client,但是无法在AWS ES上使用,所以无法连接。

以下是一份古老的摘要。

总结

    • embulk-output-elasticsearch 0.1.8はElasticsearch 2.X系に接続できません。

elasticsearchは1.Xを使いましょう
もし諸般の事情でelasticsearch v2.Xを使いたい場合は下記の方法を試してください。
リリースされたらこのページの情報が変わります。ストックして変更を追跡してください。

首先

您是否遇到了使用Embulk将数据导入Elasticsearch时出现问题的困扰?可能是因为您使用的是Elasticsearch v2.X版本,导致无法成功投入数据。

到2015年12月11日,embulk-output-elasticsearch 0.1.8尚不支持elasticsearch v2.X。

如果使用embulk-output-elasticsearch,请使用Elasticsearch 1.7.3。

希望能有更少的人说“我试过使用Embulk,但它不工作”这样的话。

连接到elasticsearch v2.X时出现的错误。

如果使用的elasticsearch服务器是v2.X版本,在执行embulk run命令时会出现以下错误。

由于embulk-output-elasticsearch的当前版本仍未支持v2.X,所以这就是原因。

org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
    at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:176) ~[na:na]
    at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:128) ~[na:na]
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[na:na]
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[na:na]
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[na:na]
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[na:na]
    at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[na:na]
    at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[na:na]
    at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[na:na]
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[na:na]
    at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[na:na]
    at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[na:na]
    at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[na:na]
    at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_31]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_31]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_31]
Caused by: java.io.StreamCorruptedException: Unsupported version: 1
    at org.elasticsearch.common.io.ThrowableObjectInputStream.readStreamHeader(ThrowableObjectInputStream.java:46) ~[na:na]
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) ~[na:1.8.0_31]
    at org.elasticsearch.common.io.ThrowableObjectInputStream.<init>(ThrowableObjectInputStream.java:38) ~[na:na]
    at org.elasticsearch.transport.netty.MessageChannelHandler.handlerResponseError(MessageChannelHandler.java:173) ~[na:na]
    ... 23 common frames omitted

使用独立构建的embulk-output-elasticsearch

如果您一定要尝试elasticsearch v2.0版本,那么请尝试使用在github上提供的embulk-output-elasticsearch。根据github上的源码看,它似乎支持v2.0版本。(但可能存在一些问题,如果遇到任何问题,请在Isshue上报告)

你可以编译并试用这个东西。(需要安装JDK。)

你可以使用下面的命令来创建一个支持elasticsearch v2.X的embulk-output-elasticsearch。

git clone https://github.com/muga/embulk-output-elasticsearch.git
cd embulk-output-elasticsearch
./gradlew gem

验证

将elasticsearch引入系统中。
将elasticsearch引入。
实施elasticsearch。
导入elasticsearch。

wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.1.0/elasticsearch-2.1.0.zip
unzip elasticsearch-2.1.0.zip 
cd elasticsearch-2.1.0
./bin/elasticsearch

创建样本数据

embulk example sample
embulk guess sample/example.yml -o config.yml

修改设置文件

更改「out」的部分。

in:
  type: file
  path_prefix: /private/tmp/sample/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
#out: {type: stdout}
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
    - host: localhost

预览

embulk preview config.yml
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                       NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+

数据输入

指定独自构建的embulk-output-elasticsearch路径并运行embulk run。

embulk run -I /path/to/embulk-output-elasticsearch/lib config.yml

如果要安装已经构建的软件包,您可以进行以下操作。

可以使用下述方法安装(请注意,版本将变为0.1.8)。

./gradlew gem
embulk gem install -l pkg/embulk-output-elasticsearch-0.1.8.gem
广告
将在 10 秒后关闭
bannerAds