使用Kafka将Elasticsearch和Neo4j进行连接

你为什么写这篇文章?

您是否在使用Elasticsearch/Neo4j进行利用?

虽然两者都是著名的数据库(DB),但它们的特点和用途是不同的。
Elasticsearch在文本信息搜索方面很强大,而Neo4j则常用于快速查询关联性的场景。
我的观感是,有时候也希望在Neo4j中根据数据属性进行查询,或者经常需要将数据与Elasticsearch中的数据进行关联。
但是,如果一味地这样做,会导致吞吐量降低,或者需要持续的实现调整,非常值得考虑。
因此,我开始考虑在重新审查数据结构的同时,是否能够做到取长补短。

通过将Neo4j和Elasticsearch进行连接,

    • Elasticsearchに投入したデータを元にNeo4jでデータの関連性を可視化する.

 

    Neo4jに投入した関連性をもつデータをElasticsearchで全文検索する.

本文的撰写动机是实现这样的应用场景。

关于用于协作的插件,可以在Neo4j的官方网页上找到相关说明。
使用该插件进行协作似乎也是可能的,但可能存在无法与最新版本的Neo4j兼容,以及GitHub存储库长时间未更新等问题,这可能会导致在使用上不安全。
在本文中,我们希望通过在两者之间添加Kafka来实现协作和通信控制。

接続.PNG
本文适用于以下人群:
「我有Linux环境,但从未进行过任何设置」
「我知道WSL存在,但从未使用过」为了使这些人能够再现操作步骤,本文尽可能详细地描述了工作过程。

在WSL2的早期版本中,由于主机和本地IP不同,造成了相当大的不便。但是现在已经解决了这个问题,所以在启动服务后,可以直接在浏览器中连接 http://localhost。

各项服务的简单介绍

弹性搜索

image.png

Elasticsearch是一个分布式的免费且开放的搜索和分析引擎。它支持各种类型的数据,包括文本、数字和地理空间信息,同时还支持结构化数据和非结构化数据。

Elasticsearch 是一款由 Elastic 公司提供的全文搜索数据库,是 Elastic Stack 的核心产品。它能够利用形态分析等技术实现丰富的全文搜索功能。

Neo4j

image.png

凭借经过验证的数万亿实体性能,开发人员、数据科学家和企业将Neo4j作为高性能、可扩展分析、智能应用开发和高级AI / ML流程的首选。

Neo4j是一款在图形数据库领域中非常受欢迎和广泛使用的产品。由于有很多详细的图形数据库的说明资料,所以我在这里不再详细介绍。但是Neo4j的特点是它对于关系搜索和可视化的需求比关系型数据库更擅长。

阿帕奇卡夫卡

image.png

Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能的数据管道、流式分析、数据集成和关键应用程序。

Apache Kafka是一个开源项目,可以中介服务间的通信并实现分布式存储和流式处理。虽然内部的细节非常复杂,我就不多说了,但我特别喜欢这个开源项目。
Confluent是一家美国的公司,专注于利用Kafka,并提供了以下介绍的连接器等独特的社区支持。

Kafka-DB服务之间的协作配置

使用Kafka Connectors来连接Kafka和数据库。
Connectors是具有将每个服务的API封装并整理数据格式进行交互的功能的jar文件。
针对每个服务,我们设置Sink(发送数据)/ Source(接收数据)。

弹性搜索接收器连接器

Elasticsearch连接器允许将数据从Kafka移动到Elasticsearch的2.x、5.x、6.x和7.x版本。它将Kafka主题中的数据写入到Elasticsearch索引,并且一个主题的所有数据具有相同的类型。

ElasticSearch Sink Connector是用于将Kafka消息注入Elasticsearch的连接器。它在GitHub上公开,并可在Confluent社区许可下使用。除非明确指定,否则Elasticsearch将自动从注入的数据中推断出映射,但对于小数和时间戳等类型可能无法正确推测。ElasticSearch Sink Connector还具有从Kafka消息模式中推断映射并进行注入的功能。

支持Elasticsearch 2.x、5.x、6.x和7.x版本。

Elasticsearch 源链接器

这是一个用来将数据从Elasticsearch导出到Apache Kafka的连接器。

Elasticsearch源连接器是一个用于从Elasticsearch传递消息到Kafka的连接器。该连接器已在GitHub上发布,并可在Apache许可证第2.0版下使用。

该软件适用于Elasticsearch 6.x和7.x版本。

Neo4j 连接器

这是一个基本的Apache Kafka Connect Neo4j连接器,可以通过Cypher模板查询将数据从Kafka主题移动到Neo4j,反之亦然。

Neo4j Connector是一种支持Sink和Source的连接器,也有Neo4j官方用户指南。这也是在GitHub上公开的,采用知识共享署名-非商业性使用-相同方式共享4.0国际许可协议进行管理。

支持Neo4j版本从5.x到目前最新版。

验证操作环境

名称バージョンWindows 1021H2Ubuntu (WSL2)20.04.5OpenJDK11Elasticsearch7.17.8Kibana7.17.8Neo4j4.4.16-communityKafka (Apache)3.3.1Gradle7.4.2ElasticSearch Sink Connector14.0.3Elasticsearch Source Connector1.5.2Neo4j Connector5.0.0

本文介绍了由于Elasticsearch的请求/响应获取变得方便,我们选择使用Kibana,但并非必需。我们在Elasticsearch和Kafka中使用Java。对于Kafka的构建,我们使用Gradle,但无需进行特殊的安装步骤。

环境准备的内容

每个服务的安装方法都按照官方网站提供的步骤进行。

    • Java

 

    • Elasticsearch

 

    • Kibana

 

    • Neo4j

 

    Kafka
由于省略了署名的检查,请根据需要适时添加。
# Java
## tar(gzip)を取得
curl -OL https://download.java.net/java/ga/jdk11/openjdk-11_linux-x64_bin.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
sudo tar xfz ./openjdk-11_linux-x64_bin.tar.gz --directory $HOME
## JAVA環境変数を登録
echo '# OpenJDK-11' >> ~/.bashrc
echo 'export JAVA_HOME=$HOME/jdk-11' >> ~/.bashrc
echo 'export CLASSPATH=$JAVA_HOME/lib' >> ~/.bashrc
echo 'export PATH=$PATH:$JAVA_HOME/bin' >> ~/.bashrc
source ~/.bashrc

# Elasticsearch
## tar(gzip)を取得
curl -OL https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.8-linux-x86_64.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./elasticsearch-7.17.8-linux-x86_64.tar.gz --directory $HOME

# Kibana
## tar(gzip)を取得
curl -OL https://artifacts.elastic.co/downloads/kibana/kibana-7.17.8-linux-x86_64.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./kibana-7.17.8-linux-x86_64.tar.gz --directory $HOME

# Neo4j
## tar(gzip)を取得
curl -L -o neo4j-community-4.4.16-unix.tar.gz https://neo4j.com/artifact.php?name=neo4j-community-4.4.16-unix.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./neo4j-community-4.4.16-unix.tar.gz --directory $HOME

# Kafka
## tar(gzip)を取得
curl -OL https://downloads.apache.org/kafka/3.3.1/kafka-3.3.1-src.tgz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./kafka-3.3.1-src.tgz --directory $HOME

首先,我們將進行啟動確認。
我們將先從Elasticsearch/Kibana開始。

# Elasticsearchを起動
cd $HOME/elasticsearch-7.17.8
./bin/elasticsearch

# Kibanaを起動
cd $HOME/kibana-7.17.8
./bin/kibana
image.png

我們接下來啟動Neo4j。

cd $HOME/neo4j-community-4.4.16
# Neo4jの起動設定,Javaヒープサイズとリッスンするアドレスのコメントを外しておく
vi ./conf/neo4j.conf
> dbms.memory.heap.initial_size=512m
> dbms.memory.heap.max_size=512m
> dbms.default_listen_address=0.0.0.0

# Neo4jを起動
./bin/neo4j console
> 2022-12-22 05:55:21.669+0000 INFO  Started.
image.png

最后启动 Kafka。

cd $HOME/kafka-3.3.1-src
# Gradleを使ってビルド
./gradlew jar -PscalaVersion=2.13.8
> BUILD SUCCESSFUL
# Zookeeperの起動
./bin/zookeeper-server-start.sh config/zookeeper.properties
> (大きな文字でZookeeperと表示される)
# "Kafka"(Broker)の起動
./bin/kafka-server-start.sh config/server.properties
> (Startingログが沢山表示される)

# Kafka Topicを作成,Brokerはlocalhost:9092で立っている.
./bin/kafka-topics.sh --create --topic kafka-elastic-topic --bootstrap-server localhost:9092
# Kafka Topicにデータを入れてみる.Ctrl+cで終了
./bin/kafka-console-producer.sh --topic kafka-elastic-topic --bootstrap-server localhost:9092
> {"name":"ab cde", "comment": "Hello"}
> {"name":"fgh ij", "comment": "Bonjour"}
# Topicに入っていることを確かめる.Ctrl+cで終了
./bin/kafka-console-consumer.sh --topic kafka-elastic-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"ab cde", "comment": "Hello"}
> {"name":"fgh ij", "comment": "Bonjour"}
> Processed a total of 2 messages

只要确认以上内容就好。

Kafka Connect 的配置设置

弹性搜索接口连接器

从官方网页下载zip文件并解压,然后设置Kafka Connect的路径。配置文件位于Kafka目录下的./config/connect-distributed.properties。

# unzipしてコピー
mkdir -p $HOME/kafka-3.3.1-src/connectors
unzip -d $HOME/kafka-3.3.1-src/connectors  confluentinc-kafka-connect-elasticsearch-14.0.3.zip

# jarのパスを通す&JsonConverterを設定する
vi $HOME/kafka-3.3.1-src/config/connect-distributed.properties
(追記)
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
(変更)
> key.converter.schemas.enable=false
> value.converter.schemas.enable=false
> plugin.path=$HOME/kafka-3.3.1-src/connectors

# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# ログからLoading Pluginを探してもいいが大変なので,以下を叩いてConnectorを認識していることを確認
curl -s localhost:8083/connector-plugins | jq '.[].class'
> "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"

使用curl进行Connect的配置。
虽然本文未指定,但在此处还可以进行重试次数、超时、代理等配置。
配置完成后,之前进行测试输入的消息应该会被投入Elasticsearch。

# Connector経由でElasticsearchに投入する設定をする,Connectはlocalhost:8083で立っている.
curl -i -X POST -H "Content-Type: application/json" -H "Accept: application/json" http://localhost:8083/connectors/ \
-d '{
  "name": "elasticsearch-connector-test",
  "config":{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "kafka-elastic-topic",
    "tasks.max": "1",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "name": "elasticsearch-connector-test",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}'
> HTTP/1.1 201 Created
> Date: Thu, 22 Dec 2022 12:37:49 GMT
> Location: http://localhost:8083/connectors/elasticsearch-connector
> Content-Type: application/json
> Content-Length: 335
> Server: Jetty(9.4.48.v20220622)
image.png

弹性搜索源连接器

请按照与 ElasticSearch Sink Connector 相类似的方式进行设置。
从官方网页下载并解压ZIP文件,添加路径后启动 Kafka Connect。
本文中与设置 ElasticSearch Sink Connector 不同的是以下内容。

    • configファイルを自前で作成する.

 

    Elasticsearchに@timestampのPipelineを設定する.
# unzipしてコピー, jarのパスはElasticsearch Sink Connectorの設定で既に通っている
unzip -d $HOME/kafka-3.3.1-src/connectors ariobalinzo-kafka-connect-elasticsearch-source-1.5.2.zip

# configファイルを作成する
vi $HOME/kafka-3.3.1-src/es-source-config.json
> {
>   "name":"elastic-source",
>   "config":{
>     "connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
>     "tasks.max": "1",
>     "es.host" : "localhost",
>     "es.port" : "9200",
>     "poll.interval.ms": "5000",
>     "index.prefix" : "elastic-kafka-topic",
>     "topic.prefix" : "source_conf_",
>     "incrementing.field.name" : "timestamp",
>     "behavior.on.null.values": "ignore",
>     "behavior.on.malformed.documents": "ignore",
>     "drop.invalid.message": "true"
>   }
> }
image.png
# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# ログからLoading Pluginを探してもいいが大変なので,以下を叩いてConnectorを認識していることを確認
curl -s localhost:8083/connector-plugins | jq '.[].class'
> "com.github.dariobalinzo.ElasticSourceConnector"

# Connectにconfig設定を送る
curl -X POST -H "Content-Type: application/json" --data @es-source-config.json http://localhost:8083/connectors | jq
image.png
# Elasticsearchに投入されたデータがKafkaに送られているか確認,Topic名はes-source-config.jsonの設定に基づく
./bin/kafka-console-consumer.sh --topic source_conf_elastic-kafka-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"klm nop","comment":"ciao","esindex":"elastic-kafka-topic","esid":"15","timestamp":"2022-12-23T02:14:13.454334Z"}
> Processed a total of 1 messages

这个也进得很顺利呢。

Neo4j 连接器

从官方网页上下载zip文件并解压,设置环境变量后启动Kafka Connect。
另外,需要下载guava-20.0.jar并放置好。
开始配置Sink。

# unzip してコピー, jarのパスはElasticsearch Sink Connectorの設定で既に通っている
unzip -d $HOME/kafka-3.3.1-src/connectors neo4j-kafka-connect-neo4j-5.0.0.zip
# guava.jar をコピー
cp guava-20.0.jar $HOME/kafka-3.3.1-src/connect/runtime/build/dependant-libs

# Kafka Topicを作成&データを追加
cd $HOME/kafka-3.3.1-src
./bin/kafka-topics.sh --create --topic kafka-neo4j-topic --bootstrap-server localhost:9092
./bin/kafka-console-producer.sh --topic kafka-neo4j-topic --bootstrap-server localhost:9092
> {"id": "1234", "name": "hoge"}
> {"id": "5678", "name": "fuga"}
> {"id": "9100", "name": "poyo"}

# Connectを起動
./bin/connect-distributed.sh config/connect-distributed.properties
# ログからLoading Pluginを探してもいいが大変なので,以下を叩いてConnectorを認識していることを確認
curl -s localhost:8083/connector-plugins | jq '.[].class'
> "streams.kafka.connect.sink.Neo4jSinkConnector"

# Connector経由でNeo4jに投入する設定をする
curl -i -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors \
-d '{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "kafka-neo4j-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.kafka-neo4j-topic": "CREATE (n:Test1:Test2 {id: event.id , name: event.name })"
  }
}'
> HTTP/1.1 201 Created
> Date: Thu, 22 Dec 2022 16:51:36 GMT
> Location: http://localhost:8083/connectors/Neo4jSinkConnector
> Content-Type: application/json
> Content-Length: 765
> Server: Jetty(9.4.48.v20220622)
image.png

接下来,设置源。与Elasticsearch类似,我们将创建一个配置文件来设置源。

# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties

# configファイルを作成する
vi $HOME/kafka-3.3.1-src/neo4j-source-config.json
> {
>   "name": "neo4j-source",
>   "config": {
>     "topic": "neo4j-kafka-topic",
>     "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
>     "neo4j.server.uri": "bolt://localhost:7687",
>     "neo4j.authentication.basic.username": "neo4j",
>     "neo4j.authentication.basic.password": "password",
>     "neo4j.encryption.enabled": false,
>     "neo4j.streaming.poll.interval.msecs": 1000,
>     "neo4j.streaming.property": "timestamp",
>     "neo4j.streaming.from": "NOW",
>     "neo4j.enforce.schema": true,
>     "neo4j.source.query": "MATCH (n:Test3) WHERE n.timestamp > $lastCheck RETURN n.name AS name, n.timestamp AS timestamp",
>     "errors.tolerance": "none",
>     "errors.deadletterqueue.topic.name": "neo4j-kafka-topic",
>     "errors.deadletterqueue.context.headers.enable":  true
>  }
> }

# Connectにconfig設定を送る
curl -X POST -H "Content-Type: application/json" --data @neo4j-source-config.json http://localhost:8083/connectors | jq
image.png
# Neo4jに投入されたデータがKafkaに送られているか確認,Topic名はneo4j-source-config.jsonの設定に基づく
./bin/kafka-console-consumer.sh --topic neo4j-kafka-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"piyo","timestamp":1671733082653}
> Processed a total of 1 messages

卡夫卡的消息已经反映出来了。

利用Elasticsearch导入的数据,在Neo4j中可视化数据的关联性。

使用例.PNG
{
	"name": "elastic-source-to-neo4j-node",
	"config": {
		"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
		"tasks.max": "1",
		"es.host": "localhost",
		"es.port": "9200",
		"poll.interval.ms": "5000",
		"index.prefix": "elastic-kafka-node-topic",
		"topic.prefix": "source_conf_",
		"incrementing.field.name": "timestamp",
		"behavior.on.null.values": "ignore",
		"behavior.on.malformed.documents": "ignore",
		"drop.invalid.message": "true"
	}
}
{
	"name": "elastic-source-to-neo4j-relation",
	"config": {
		"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
		"tasks.max": "1",
		"es.host": "localhost",
		"es.port": "9200",
		"poll.interval.ms": "5000",
		"index.prefix": "elastic-kafka-relation-topic",
		"topic.prefix": "source_conf_",
		"incrementing.field.name": "timestamp",
		"behavior.on.null.values": "ignore",
		"behavior.on.malformed.documents": "ignore",
		"drop.invalid.message": "true"
	}
}

上述的数据输入到 elastic-kafka-node-topic 索引中,应该会被存储在 source_conf_elastic-kafka-node-topic 主题中,而输入到 elastic-kafka-relation-topic 索引中的数据也会存储在 source_conf_elastic-kafka-node-topic 主题中。

然后,启动Kafka Connect,并发送Elastic Source Connector的配置,然后进行设置Neo4j Sink Connector的配置。

# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# ConnectにElasticsearch Source Connectorのconfig設定を送る
## ノード
curl -X POST -H "Content-Type: application/json" --data @es-source-config-test-node.json http://localhost:8083/connectors | jq
## リレーション
curl -X POST -H "Content-Type: application/json" --data @es-source-config-test-relation.json http://localhost:8083/connectors | jq
# ConnectにNeo4j Sink Connectorのconfig設定を送る
## ノード
curl -i -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors \
-d '{
  "name": "Neo4jSinkConnector-Node",
  "config": {
    "topics": "source_conf_elastic-kafka-node-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.source_conf_elastic-kafka-node-topic": "CREATE (n:Pokemon {id: event.id , name: event.name})"
  }
}'
## リレーション
curl -i -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors \
-d '{
  "name": "Neo4jSinkConnector-Relation4",
  "config": {
    "topics": "source_conf_elastic-kafka-relation-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.source_conf_elastic-kafka-relation-topic": "MATCH (n:Pokemon {id: event.fromid}), (m:Pokemon {id: event.toid}) CREATE (n)-[:Evole]->(m)"
  }
}'
image.png

我确认Neo4j已经无事到达了!

由于Neo4j无法建立不存在的节点关系,所以需要正确控制节点的插入顺序。

关于原本隐藏的陷阱

首先,由于现有的合作插件不支持最新版本的 Neo4j 或 GitHub 仓库未进行更新,因此在本文中我们提到了利用 Kafka 进行协同。虽然那些通常使用 Elastic 产品的人可能已经注意到了,但实际上,我们这次使用的连接器还不支持最新版本的 Elasticsearch。

在最新版本(8.x)中,可以使用在Elasticsearch启动时显示的密码和协作代码来开始连接。默认情况下,使用HTTPS协议。

# Elasticsearchを起動
cd $HOME/elasticsearch-8.5.3
./bin/elasticsearch
> ℹ️  Password for the elastic user (reset with `bin/elasticsearch-reset-password -u elastic`): hogeeee
> ...
> ℹ️  Configure Kibana to use this cluster:
> • Run Kibana and click the configuration link in the terminal when Kibana starts.
> • Copy the following enrollment token and paste it into Kibana in your browser (valid for the next 30 minutes): fugaaaa
> ...

# Kibanaを起動
cd $HOME/kibana-8.5.3
./bin/kibana
> ℹ️ Kibana has not been configured.
> Go to http://localhost:5601/?code=986486 to get started.
> ...
> (以下はElasticsearchとKibanaの連携作業を始めると現れる)
> Your verification code is:  xxx xxx

在此验证中,Kibana界面显示未启用Elasticsearch内置的安全功能。没有认证,您的集群可能对任何人都可访问。或者在与Elasticsearch进行curl时使用HTTP。
事实上,在7.x和8.x中,默认的安全功能已经发生了变化,如果按照本文进行操作,将无法实现Elasticsearch-Kafka的协作。

因此,根据我的调查,似乎有一种像是可以禁用安全功能的设置。因此,我想最后要对此进行验证。

名称バージョンElasticsearch8.5.3Kafka (Apache)3.3.1
# elasticsearch.yml で xpack.security.enabled を無効化
vi $HOME/elasticsearch-8.5.3/config/elasticsearch.yml
> xpack.security.enabled: false
# Elasticsearchを起動
cd $HOME/elasticsearch-8.5.3
./bin/elasticsearch

确实,密码信息不会被显示,可以通过HTTP连接。

# Zookeeperの起動
cd $HOME/kafka-3.3.1-src
./bin/zookeeper-server-start.sh config/zookeeper.properties
# Broker/Connectを含む"Kafka"の起動
./bin/kafka-server-start.sh config/server.properties

# Kafka Topicを作成
./bin/kafka-topics.sh --create --topic new-elastic-topic --bootstrap-server localhost:9092
# Kafka Topicにデータを入れる.Ctrl+cで終了
./bin/kafka-console-producer.sh --topic new-elastic-topic --bootstrap-server localhost:9092
> {"name":"8.5.3", "comment": "New Version"}
# Topicに入っていることを確かめる.Ctrl+cで終了
./bin/kafka-console-consumer.sh --topic new-elastic-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"8.5.3", "comment": "New Version"}
> Processed a total of 1 messages

# Connectを起動
./bin/connect-distributed.sh config/connect-distributed.properties
# Connector経由でElasticsearchに投入する設定をする
curl -i -X POST -H "Content-Type: application/json" -H "Accept: application/json" http://localhost:8083/connectors/ \
-d '{
  "name": "elasticsearch-connector-test-new",
  "config":{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "new-elastic-topic",
    "tasks.max": "1",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "name": "elasticsearch-connector-test-new",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}'
> HTTP/1.1 201 Created
> Date: Fri, 23 Dec 2022 06:26:43 GMT
> Location: http://localhost:8083/connectors/elasticsearch-connector-test-new
> Content-Type: application/json
> Content-Length: 351
> Server: Jetty(9.4.48.v20220622)

# Elasticsearch に問い合わせてみる
curl -sS -XGET 'localhost:9200/new-elastic-topic/_search?pretty'
> {
>   "took" : 13,
>   "timed_out" : false,
>   "_shards" : {
>     "total" : 1,
>     "successful" : 1,
>     "skipped" : 0,
>     "failed" : 0
>   },
>   "hits" : {
>     "total" : {
>       "value" : 1,
>       "relation" : "eq"
>     },
>     "max_score" : 1.0,
>     "hits" : [
>       {
>         "_index" : "new-elastic-topic",
>         "_id" : "new-elastic-topic+0+0",
>         "_score" : 1.0,
>         "_source" : {
>           "name" : "8.5.3",
>           "comment" : "New Version"
>         }
>       }
>     ]
>   }
> }

数据已经成功地从 Kafka Topic 写入到 Elasticsearch 中!如果禁用安全功能的话,看起来可以在相同的配置下进行通信呢。

如果选择使用集装箱进行建造的话

在开始撰写本文时,最初我使用了Confluent公司的Docker镜像来构建。
我认为在这里介绍的方法中,需要移动很多文件夹,有点困难,但使用容器编排可以更加简洁。
而且,使用容器不需要构建Kafka,说实话,容器的准备工作更容易。
那为什么我没有介绍这种容器编排呢?因为网络配置很困难。
在我使用的kafka-3.3.1-src.tgz中,通过在同一目录下运行Shell命令进行多个端口之间的通讯。
这样可能不容易理解,但实际上涉及了Zookeeper、Broker和Connect等多个服务。
(对于了解Kafka的人可能会说“为什么现在才介绍…”)
另一方面,使用容器编排(类似于微服务),我们需要将它们分开并编写docker-compose.yml文件。
当然要建立docker network,同时还需适当设置LISTENERS和ADVERTISED_LISTENERS,如果开始解释这些,可能就无法讲清楚了(也有一种想要更深入理解后再说明的想法)所以就不在这里解释了。
如果有需要的话,我可能会在以后的追加内容中写明。
在本文中,我只会提供参考信息。

    • Confluent社が出している all-in-one な docker-compose.yml の見本がある

Kafkaコンテナの立て方ガイドもある

環境変数に設定する値はよく確認する

all-in-one に含まれるサービスのライセンスはバラバラなので必ず確認する(例えば,Schema Registry は Confluent Community ライセンス)

例如,它不允许作为在线服务提供与Confluent SaaS产品竞争的Confluent ksqlDB、Confluent Schema Registry、Confluent REST Proxy或其他受Confluent社区许可证约束的软件。

    • Confluent社のKafkaコンテナイメージ (7.3.0) のOS:Red Hat Enterprise Linux 8

Elasticsearch, Kibana, Neo4j それぞれにも docker を使った立て方ガイドがある
初めてコンテナで Elastic Stack をたてる際は bootstrap checks failed のエラーが出てしまうことがあるので,vm.max_map_count の設定を行う

以下是原文的中文翻译选项:1. [链接1](https://qiita.com/gyojir/items/ed16030559ab2c6469a7)
2. [链接2](https://www.conduktor.io/kafka/how-to-install-apache-kafka-on-windows)
3. [链接3](https://forest.watch.impress.co.jp/docs/news/1198651.html)
4. [链接4](https://db-engines.com/en/ranking/graph+dbms)
5. [链接5](https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues/70)
6. [链接6](https://qiita.com/s_nakamura/items/e94f1a11f6e7398b4c8f)
7. [链接7](https://mag.osdn.jp/22/02/14/171200)
8. [链接8](https://discuss.elastic.co/t/disable-security/66380)
9. [链接9](https://www.confluent.io/ja-jp/blog/kafka-listeners-explained/)

广告
将在 10 秒后关闭
bannerAds