使用Kafka将Elasticsearch和Neo4j进行连接
你为什么写这篇文章?
您是否在使用Elasticsearch/Neo4j进行利用?
虽然两者都是著名的数据库(DB),但它们的特点和用途是不同的。
Elasticsearch在文本信息搜索方面很强大,而Neo4j则常用于快速查询关联性的场景。
我的观感是,有时候也希望在Neo4j中根据数据属性进行查询,或者经常需要将数据与Elasticsearch中的数据进行关联。
但是,如果一味地这样做,会导致吞吐量降低,或者需要持续的实现调整,非常值得考虑。
因此,我开始考虑在重新审查数据结构的同时,是否能够做到取长补短。
通过将Neo4j和Elasticsearch进行连接,
-
- Elasticsearchに投入したデータを元にNeo4jでデータの関連性を可視化する.
- Neo4jに投入した関連性をもつデータをElasticsearchで全文検索する.
本文的撰写动机是实现这样的应用场景。
关于用于协作的插件,可以在Neo4j的官方网页上找到相关说明。
使用该插件进行协作似乎也是可能的,但可能存在无法与最新版本的Neo4j兼容,以及GitHub存储库长时间未更新等问题,这可能会导致在使用上不安全。
在本文中,我们希望通过在两者之间添加Kafka来实现协作和通信控制。
「我有Linux环境,但从未进行过任何设置」
「我知道WSL存在,但从未使用过」为了使这些人能够再现操作步骤,本文尽可能详细地描述了工作过程。
各项服务的简单介绍
弹性搜索
Elasticsearch是一个分布式的免费且开放的搜索和分析引擎。它支持各种类型的数据,包括文本、数字和地理空间信息,同时还支持结构化数据和非结构化数据。
Elasticsearch 是一款由 Elastic 公司提供的全文搜索数据库,是 Elastic Stack 的核心产品。它能够利用形态分析等技术实现丰富的全文搜索功能。
Neo4j
凭借经过验证的数万亿实体性能,开发人员、数据科学家和企业将Neo4j作为高性能、可扩展分析、智能应用开发和高级AI / ML流程的首选。
Neo4j是一款在图形数据库领域中非常受欢迎和广泛使用的产品。由于有很多详细的图形数据库的说明资料,所以我在这里不再详细介绍。但是Neo4j的特点是它对于关系搜索和可视化的需求比关系型数据库更擅长。
阿帕奇卡夫卡
Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能的数据管道、流式分析、数据集成和关键应用程序。
Apache Kafka是一个开源项目,可以中介服务间的通信并实现分布式存储和流式处理。虽然内部的细节非常复杂,我就不多说了,但我特别喜欢这个开源项目。
Confluent是一家美国的公司,专注于利用Kafka,并提供了以下介绍的连接器等独特的社区支持。
Kafka-DB服务之间的协作配置
使用Kafka Connectors来连接Kafka和数据库。
Connectors是具有将每个服务的API封装并整理数据格式进行交互的功能的jar文件。
针对每个服务,我们设置Sink(发送数据)/ Source(接收数据)。
弹性搜索接收器连接器
Elasticsearch连接器允许将数据从Kafka移动到Elasticsearch的2.x、5.x、6.x和7.x版本。它将Kafka主题中的数据写入到Elasticsearch索引,并且一个主题的所有数据具有相同的类型。
ElasticSearch Sink Connector是用于将Kafka消息注入Elasticsearch的连接器。它在GitHub上公开,并可在Confluent社区许可下使用。除非明确指定,否则Elasticsearch将自动从注入的数据中推断出映射,但对于小数和时间戳等类型可能无法正确推测。ElasticSearch Sink Connector还具有从Kafka消息模式中推断映射并进行注入的功能。
Elasticsearch 源链接器
这是一个用来将数据从Elasticsearch导出到Apache Kafka的连接器。
Elasticsearch源连接器是一个用于从Elasticsearch传递消息到Kafka的连接器。该连接器已在GitHub上发布,并可在Apache许可证第2.0版下使用。
Neo4j 连接器
这是一个基本的Apache Kafka Connect Neo4j连接器,可以通过Cypher模板查询将数据从Kafka主题移动到Neo4j,反之亦然。
Neo4j Connector是一种支持Sink和Source的连接器,也有Neo4j官方用户指南。这也是在GitHub上公开的,采用知识共享署名-非商业性使用-相同方式共享4.0国际许可协议进行管理。
验证操作环境
本文介绍了由于Elasticsearch的请求/响应获取变得方便,我们选择使用Kibana,但并非必需。我们在Elasticsearch和Kafka中使用Java。对于Kafka的构建,我们使用Gradle,但无需进行特殊的安装步骤。
环境准备的内容
每个服务的安装方法都按照官方网站提供的步骤进行。
-
- Java
-
- Elasticsearch
-
- Kibana
-
- Neo4j
- Kafka
# Java
## tar(gzip)を取得
curl -OL https://download.java.net/java/ga/jdk11/openjdk-11_linux-x64_bin.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
sudo tar xfz ./openjdk-11_linux-x64_bin.tar.gz --directory $HOME
## JAVA環境変数を登録
echo '# OpenJDK-11' >> ~/.bashrc
echo 'export JAVA_HOME=$HOME/jdk-11' >> ~/.bashrc
echo 'export CLASSPATH=$JAVA_HOME/lib' >> ~/.bashrc
echo 'export PATH=$PATH:$JAVA_HOME/bin' >> ~/.bashrc
source ~/.bashrc
# Elasticsearch
## tar(gzip)を取得
curl -OL https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.8-linux-x86_64.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./elasticsearch-7.17.8-linux-x86_64.tar.gz --directory $HOME
# Kibana
## tar(gzip)を取得
curl -OL https://artifacts.elastic.co/downloads/kibana/kibana-7.17.8-linux-x86_64.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./kibana-7.17.8-linux-x86_64.tar.gz --directory $HOME
# Neo4j
## tar(gzip)を取得
curl -L -o neo4j-community-4.4.16-unix.tar.gz https://neo4j.com/artifact.php?name=neo4j-community-4.4.16-unix.tar.gz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./neo4j-community-4.4.16-unix.tar.gz --directory $HOME
# Kafka
## tar(gzip)を取得
curl -OL https://downloads.apache.org/kafka/3.3.1/kafka-3.3.1-src.tgz
## (どこでも良いが)ユーザディレクトリ下に展開
tar -xzf ./kafka-3.3.1-src.tgz --directory $HOME
首先,我們將進行啟動確認。
我們將先從Elasticsearch/Kibana開始。
# Elasticsearchを起動
cd $HOME/elasticsearch-7.17.8
./bin/elasticsearch
# Kibanaを起動
cd $HOME/kibana-7.17.8
./bin/kibana
我們接下來啟動Neo4j。
cd $HOME/neo4j-community-4.4.16
# Neo4jの起動設定,Javaヒープサイズとリッスンするアドレスのコメントを外しておく
vi ./conf/neo4j.conf
> dbms.memory.heap.initial_size=512m
> dbms.memory.heap.max_size=512m
> dbms.default_listen_address=0.0.0.0
# Neo4jを起動
./bin/neo4j console
> 2022-12-22 05:55:21.669+0000 INFO Started.
最后启动 Kafka。
cd $HOME/kafka-3.3.1-src
# Gradleを使ってビルド
./gradlew jar -PscalaVersion=2.13.8
> BUILD SUCCESSFUL
# Zookeeperの起動
./bin/zookeeper-server-start.sh config/zookeeper.properties
> (大きな文字でZookeeperと表示される)
# "Kafka"(Broker)の起動
./bin/kafka-server-start.sh config/server.properties
> (Startingログが沢山表示される)
# Kafka Topicを作成,Brokerはlocalhost:9092で立っている.
./bin/kafka-topics.sh --create --topic kafka-elastic-topic --bootstrap-server localhost:9092
# Kafka Topicにデータを入れてみる.Ctrl+cで終了
./bin/kafka-console-producer.sh --topic kafka-elastic-topic --bootstrap-server localhost:9092
> {"name":"ab cde", "comment": "Hello"}
> {"name":"fgh ij", "comment": "Bonjour"}
# Topicに入っていることを確かめる.Ctrl+cで終了
./bin/kafka-console-consumer.sh --topic kafka-elastic-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"ab cde", "comment": "Hello"}
> {"name":"fgh ij", "comment": "Bonjour"}
> Processed a total of 2 messages
只要确认以上内容就好。
Kafka Connect 的配置设置
弹性搜索接口连接器
从官方网页下载zip文件并解压,然后设置Kafka Connect的路径。配置文件位于Kafka目录下的./config/connect-distributed.properties。
# unzipしてコピー
mkdir -p $HOME/kafka-3.3.1-src/connectors
unzip -d $HOME/kafka-3.3.1-src/connectors confluentinc-kafka-connect-elasticsearch-14.0.3.zip
# jarのパスを通す&JsonConverterを設定する
vi $HOME/kafka-3.3.1-src/config/connect-distributed.properties
(追記)
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
(変更)
> key.converter.schemas.enable=false
> value.converter.schemas.enable=false
> plugin.path=$HOME/kafka-3.3.1-src/connectors
# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# ログからLoading Pluginを探してもいいが大変なので,以下を叩いてConnectorを認識していることを確認
curl -s localhost:8083/connector-plugins | jq '.[].class'
> "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
使用curl进行Connect的配置。
虽然本文未指定,但在此处还可以进行重试次数、超时、代理等配置。
配置完成后,之前进行测试输入的消息应该会被投入Elasticsearch。
# Connector経由でElasticsearchに投入する設定をする,Connectはlocalhost:8083で立っている.
curl -i -X POST -H "Content-Type: application/json" -H "Accept: application/json" http://localhost:8083/connectors/ \
-d '{
"name": "elasticsearch-connector-test",
"config":{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "kafka-elastic-topic",
"tasks.max": "1",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"name": "elasticsearch-connector-test",
"key.ignore": "true",
"schema.ignore": "true"
}
}'
> HTTP/1.1 201 Created
> Date: Thu, 22 Dec 2022 12:37:49 GMT
> Location: http://localhost:8083/connectors/elasticsearch-connector
> Content-Type: application/json
> Content-Length: 335
> Server: Jetty(9.4.48.v20220622)
弹性搜索源连接器
请按照与 ElasticSearch Sink Connector 相类似的方式进行设置。
从官方网页下载并解压ZIP文件,添加路径后启动 Kafka Connect。
本文中与设置 ElasticSearch Sink Connector 不同的是以下内容。
-
- configファイルを自前で作成する.
- Elasticsearchに@timestampのPipelineを設定する.
# unzipしてコピー, jarのパスはElasticsearch Sink Connectorの設定で既に通っている
unzip -d $HOME/kafka-3.3.1-src/connectors ariobalinzo-kafka-connect-elasticsearch-source-1.5.2.zip
# configファイルを作成する
vi $HOME/kafka-3.3.1-src/es-source-config.json
> {
> "name":"elastic-source",
> "config":{
> "connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
> "tasks.max": "1",
> "es.host" : "localhost",
> "es.port" : "9200",
> "poll.interval.ms": "5000",
> "index.prefix" : "elastic-kafka-topic",
> "topic.prefix" : "source_conf_",
> "incrementing.field.name" : "timestamp",
> "behavior.on.null.values": "ignore",
> "behavior.on.malformed.documents": "ignore",
> "drop.invalid.message": "true"
> }
> }
# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# ログからLoading Pluginを探してもいいが大変なので,以下を叩いてConnectorを認識していることを確認
curl -s localhost:8083/connector-plugins | jq '.[].class'
> "com.github.dariobalinzo.ElasticSourceConnector"
# Connectにconfig設定を送る
curl -X POST -H "Content-Type: application/json" --data @es-source-config.json http://localhost:8083/connectors | jq
# Elasticsearchに投入されたデータがKafkaに送られているか確認,Topic名はes-source-config.jsonの設定に基づく
./bin/kafka-console-consumer.sh --topic source_conf_elastic-kafka-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"klm nop","comment":"ciao","esindex":"elastic-kafka-topic","esid":"15","timestamp":"2022-12-23T02:14:13.454334Z"}
> Processed a total of 1 messages
这个也进得很顺利呢。
Neo4j 连接器
从官方网页上下载zip文件并解压,设置环境变量后启动Kafka Connect。
另外,需要下载guava-20.0.jar并放置好。
开始配置Sink。
# unzip してコピー, jarのパスはElasticsearch Sink Connectorの設定で既に通っている
unzip -d $HOME/kafka-3.3.1-src/connectors neo4j-kafka-connect-neo4j-5.0.0.zip
# guava.jar をコピー
cp guava-20.0.jar $HOME/kafka-3.3.1-src/connect/runtime/build/dependant-libs
# Kafka Topicを作成&データを追加
cd $HOME/kafka-3.3.1-src
./bin/kafka-topics.sh --create --topic kafka-neo4j-topic --bootstrap-server localhost:9092
./bin/kafka-console-producer.sh --topic kafka-neo4j-topic --bootstrap-server localhost:9092
> {"id": "1234", "name": "hoge"}
> {"id": "5678", "name": "fuga"}
> {"id": "9100", "name": "poyo"}
# Connectを起動
./bin/connect-distributed.sh config/connect-distributed.properties
# ログからLoading Pluginを探してもいいが大変なので,以下を叩いてConnectorを認識していることを確認
curl -s localhost:8083/connector-plugins | jq '.[].class'
> "streams.kafka.connect.sink.Neo4jSinkConnector"
# Connector経由でNeo4jに投入する設定をする
curl -i -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors \
-d '{
"name": "Neo4jSinkConnector",
"config": {
"topics": "kafka-neo4j-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.kafka-neo4j-topic": "CREATE (n:Test1:Test2 {id: event.id , name: event.name })"
}
}'
> HTTP/1.1 201 Created
> Date: Thu, 22 Dec 2022 16:51:36 GMT
> Location: http://localhost:8083/connectors/Neo4jSinkConnector
> Content-Type: application/json
> Content-Length: 765
> Server: Jetty(9.4.48.v20220622)
接下来,设置源。与Elasticsearch类似,我们将创建一个配置文件来设置源。
# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# configファイルを作成する
vi $HOME/kafka-3.3.1-src/neo4j-source-config.json
> {
> "name": "neo4j-source",
> "config": {
> "topic": "neo4j-kafka-topic",
> "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
> "neo4j.server.uri": "bolt://localhost:7687",
> "neo4j.authentication.basic.username": "neo4j",
> "neo4j.authentication.basic.password": "password",
> "neo4j.encryption.enabled": false,
> "neo4j.streaming.poll.interval.msecs": 1000,
> "neo4j.streaming.property": "timestamp",
> "neo4j.streaming.from": "NOW",
> "neo4j.enforce.schema": true,
> "neo4j.source.query": "MATCH (n:Test3) WHERE n.timestamp > $lastCheck RETURN n.name AS name, n.timestamp AS timestamp",
> "errors.tolerance": "none",
> "errors.deadletterqueue.topic.name": "neo4j-kafka-topic",
> "errors.deadletterqueue.context.headers.enable": true
> }
> }
# Connectにconfig設定を送る
curl -X POST -H "Content-Type: application/json" --data @neo4j-source-config.json http://localhost:8083/connectors | jq
# Neo4jに投入されたデータがKafkaに送られているか確認,Topic名はneo4j-source-config.jsonの設定に基づく
./bin/kafka-console-consumer.sh --topic neo4j-kafka-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"piyo","timestamp":1671733082653}
> Processed a total of 1 messages
卡夫卡的消息已经反映出来了。
利用Elasticsearch导入的数据,在Neo4j中可视化数据的关联性。
{
"name": "elastic-source-to-neo4j-node",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host": "localhost",
"es.port": "9200",
"poll.interval.ms": "5000",
"index.prefix": "elastic-kafka-node-topic",
"topic.prefix": "source_conf_",
"incrementing.field.name": "timestamp",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"drop.invalid.message": "true"
}
}
{
"name": "elastic-source-to-neo4j-relation",
"config": {
"connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
"tasks.max": "1",
"es.host": "localhost",
"es.port": "9200",
"poll.interval.ms": "5000",
"index.prefix": "elastic-kafka-relation-topic",
"topic.prefix": "source_conf_",
"incrementing.field.name": "timestamp",
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"drop.invalid.message": "true"
}
}
上述的数据输入到 elastic-kafka-node-topic 索引中,应该会被存储在 source_conf_elastic-kafka-node-topic 主题中,而输入到 elastic-kafka-relation-topic 索引中的数据也会存储在 source_conf_elastic-kafka-node-topic 主题中。
然后,启动Kafka Connect,并发送Elastic Source Connector的配置,然后进行设置Neo4j Sink Connector的配置。
# Connectを起動
cd $HOME/kafka-3.3.1-src
./bin/connect-distributed.sh config/connect-distributed.properties
# ConnectにElasticsearch Source Connectorのconfig設定を送る
## ノード
curl -X POST -H "Content-Type: application/json" --data @es-source-config-test-node.json http://localhost:8083/connectors | jq
## リレーション
curl -X POST -H "Content-Type: application/json" --data @es-source-config-test-relation.json http://localhost:8083/connectors | jq
# ConnectにNeo4j Sink Connectorのconfig設定を送る
## ノード
curl -i -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors \
-d '{
"name": "Neo4jSinkConnector-Node",
"config": {
"topics": "source_conf_elastic-kafka-node-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.source_conf_elastic-kafka-node-topic": "CREATE (n:Pokemon {id: event.id , name: event.name})"
}
}'
## リレーション
curl -i -X POST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors \
-d '{
"name": "Neo4jSinkConnector-Relation4",
"config": {
"topics": "source_conf_elastic-kafka-relation-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.source_conf_elastic-kafka-relation-topic": "MATCH (n:Pokemon {id: event.fromid}), (m:Pokemon {id: event.toid}) CREATE (n)-[:Evole]->(m)"
}
}'
我确认Neo4j已经无事到达了!
关于原本隐藏的陷阱
首先,由于现有的合作插件不支持最新版本的 Neo4j 或 GitHub 仓库未进行更新,因此在本文中我们提到了利用 Kafka 进行协同。虽然那些通常使用 Elastic 产品的人可能已经注意到了,但实际上,我们这次使用的连接器还不支持最新版本的 Elasticsearch。
在最新版本(8.x)中,可以使用在Elasticsearch启动时显示的密码和协作代码来开始连接。默认情况下,使用HTTPS协议。
# Elasticsearchを起動
cd $HOME/elasticsearch-8.5.3
./bin/elasticsearch
> ℹ️ Password for the elastic user (reset with `bin/elasticsearch-reset-password -u elastic`): hogeeee
> ...
> ℹ️ Configure Kibana to use this cluster:
> • Run Kibana and click the configuration link in the terminal when Kibana starts.
> • Copy the following enrollment token and paste it into Kibana in your browser (valid for the next 30 minutes): fugaaaa
> ...
# Kibanaを起動
cd $HOME/kibana-8.5.3
./bin/kibana
> ℹ️ Kibana has not been configured.
> Go to http://localhost:5601/?code=986486 to get started.
> ...
> (以下はElasticsearchとKibanaの連携作業を始めると現れる)
> Your verification code is: xxx xxx
在此验证中,Kibana界面显示未启用Elasticsearch内置的安全功能。没有认证,您的集群可能对任何人都可访问。或者在与Elasticsearch进行curl时使用HTTP。
事实上,在7.x和8.x中,默认的安全功能已经发生了变化,如果按照本文进行操作,将无法实现Elasticsearch-Kafka的协作。
因此,根据我的调查,似乎有一种像是可以禁用安全功能的设置。因此,我想最后要对此进行验证。
# elasticsearch.yml で xpack.security.enabled を無効化
vi $HOME/elasticsearch-8.5.3/config/elasticsearch.yml
> xpack.security.enabled: false
# Elasticsearchを起動
cd $HOME/elasticsearch-8.5.3
./bin/elasticsearch
确实,密码信息不会被显示,可以通过HTTP连接。
# Zookeeperの起動
cd $HOME/kafka-3.3.1-src
./bin/zookeeper-server-start.sh config/zookeeper.properties
# Broker/Connectを含む"Kafka"の起動
./bin/kafka-server-start.sh config/server.properties
# Kafka Topicを作成
./bin/kafka-topics.sh --create --topic new-elastic-topic --bootstrap-server localhost:9092
# Kafka Topicにデータを入れる.Ctrl+cで終了
./bin/kafka-console-producer.sh --topic new-elastic-topic --bootstrap-server localhost:9092
> {"name":"8.5.3", "comment": "New Version"}
# Topicに入っていることを確かめる.Ctrl+cで終了
./bin/kafka-console-consumer.sh --topic new-elastic-topic --bootstrap-server localhost:9092 --from-beginning
> {"name":"8.5.3", "comment": "New Version"}
> Processed a total of 1 messages
# Connectを起動
./bin/connect-distributed.sh config/connect-distributed.properties
# Connector経由でElasticsearchに投入する設定をする
curl -i -X POST -H "Content-Type: application/json" -H "Accept: application/json" http://localhost:8083/connectors/ \
-d '{
"name": "elasticsearch-connector-test-new",
"config":{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "new-elastic-topic",
"tasks.max": "1",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"name": "elasticsearch-connector-test-new",
"key.ignore": "true",
"schema.ignore": "true"
}
}'
> HTTP/1.1 201 Created
> Date: Fri, 23 Dec 2022 06:26:43 GMT
> Location: http://localhost:8083/connectors/elasticsearch-connector-test-new
> Content-Type: application/json
> Content-Length: 351
> Server: Jetty(9.4.48.v20220622)
# Elasticsearch に問い合わせてみる
curl -sS -XGET 'localhost:9200/new-elastic-topic/_search?pretty'
> {
> "took" : 13,
> "timed_out" : false,
> "_shards" : {
> "total" : 1,
> "successful" : 1,
> "skipped" : 0,
> "failed" : 0
> },
> "hits" : {
> "total" : {
> "value" : 1,
> "relation" : "eq"
> },
> "max_score" : 1.0,
> "hits" : [
> {
> "_index" : "new-elastic-topic",
> "_id" : "new-elastic-topic+0+0",
> "_score" : 1.0,
> "_source" : {
> "name" : "8.5.3",
> "comment" : "New Version"
> }
> }
> ]
> }
> }
数据已经成功地从 Kafka Topic 写入到 Elasticsearch 中!如果禁用安全功能的话,看起来可以在相同的配置下进行通信呢。
如果选择使用集装箱进行建造的话
在开始撰写本文时,最初我使用了Confluent公司的Docker镜像来构建。
我认为在这里介绍的方法中,需要移动很多文件夹,有点困难,但使用容器编排可以更加简洁。
而且,使用容器不需要构建Kafka,说实话,容器的准备工作更容易。
那为什么我没有介绍这种容器编排呢?因为网络配置很困难。
在我使用的kafka-3.3.1-src.tgz中,通过在同一目录下运行Shell命令进行多个端口之间的通讯。
这样可能不容易理解,但实际上涉及了Zookeeper、Broker和Connect等多个服务。
(对于了解Kafka的人可能会说“为什么现在才介绍…”)
另一方面,使用容器编排(类似于微服务),我们需要将它们分开并编写docker-compose.yml文件。
当然要建立docker network,同时还需适当设置LISTENERS和ADVERTISED_LISTENERS,如果开始解释这些,可能就无法讲清楚了(也有一种想要更深入理解后再说明的想法)所以就不在这里解释了。
如果有需要的话,我可能会在以后的追加内容中写明。
在本文中,我只会提供参考信息。
-
- Confluent社が出している all-in-one な docker-compose.yml の見本がある
Kafkaコンテナの立て方ガイドもある
環境変数に設定する値はよく確認する
all-in-one に含まれるサービスのライセンスはバラバラなので必ず確認する(例えば,Schema Registry は Confluent Community ライセンス)
例如,它不允许作为在线服务提供与Confluent SaaS产品竞争的Confluent ksqlDB、Confluent Schema Registry、Confluent REST Proxy或其他受Confluent社区许可证约束的软件。
-
- Confluent社のKafkaコンテナイメージ (7.3.0) のOS:Red Hat Enterprise Linux 8
Elasticsearch, Kibana, Neo4j それぞれにも docker を使った立て方ガイドがある
初めてコンテナで Elastic Stack をたてる際は bootstrap checks failed のエラーが出てしまうことがあるので,vm.max_map_count の設定を行う
2. [链接2](https://www.conduktor.io/kafka/how-to-install-apache-kafka-on-windows)
3. [链接3](https://forest.watch.impress.co.jp/docs/news/1198651.html)
4. [链接4](https://db-engines.com/en/ranking/graph+dbms)
5. [链接5](https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues/70)
6. [链接6](https://qiita.com/s_nakamura/items/e94f1a11f6e7398b4c8f)
7. [链接7](https://mag.osdn.jp/22/02/14/171200)
8. [链接8](https://discuss.elastic.co/t/disable-security/66380)
9. [链接9](https://www.confluent.io/ja-jp/blog/kafka-listeners-explained/)