使用Confluent平台来尝试将PostgreSQL的数据流传输到Elasticsearch

这篇文章的目的是发布关于Confluent Platform(Apache Kafka)的技术博客,整理自己的理解。同时,希望能帮助那些计划尝试使用这个平台的工程师们。
这篇文章计划将内容分成几个部分。
由于没有完全审查,可能包含不准确的内容。如果有的话,请留下评论,我会非常高兴。

开场白

在这篇文章中,我们将建立PostgreSQL、Confluent Platform和Elasticsearch的系统。
有关系统架构的概述图,请参考上一篇文章。

构建环境

在环境构建中,我们使用了可以轻松使用和丢弃的Docker。

Docker编排

我已经将本次使用的docker-compose发布到GitHub上。如果想要查看整个内容,请参考链接中的gist。

你可以参考以下的网站。

在创建Docker Compose时,我们参考了下面的网站。

    PostgreSQL

 

    Elasticsearch

 

    Confluent Platform

 

解释

我将解释与本次验证相关的docker-compose文件的部分。

PostgreSQL 的中文释义是:
– 后置资格系统格
– 后道资料统
– 后述资料文件

我们计划在另一篇文章中进行说明,但连接到PostgreSQL的Connector插件将使用Debezium connector for PostgreSQL。此外,我们将使用默认提供的pgoutput插件来使用PostgreSQL+10提供的逻辑解码器插件。
如果您想了解更多关于逻辑解码器的详细内容,请参考PostgreSQL的官方文档和Debezium的官方文档。

为了使用pgoutput的逻辑解码器,正在将PostgreSQL的复制设置更改为逻辑复制。

  postgres:
...
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=5"
      - "-c"
      - "max_wal_senders=5"

可以通过执行以下SQL查询来简单确认复制设置是否已更改。
以下是查询的执行结果。

postgres=# SELECT name, setting, unit, short_desc FROM pg_settings WHERE name = 'wal_level';
   name    | setting | unit |                    short_desc                    
-----------+---------+------+--------------------------------------------------
 wal_level | logical |      | Set the level of information written to the WAL.
(1 row)

Elasticsearch 弹性搜索

Elasticsearch可以在多个节点上构建集群,但由于本次不需要集群配置,因此使用discovery.type参数指定为single-node(单节点),以便在单一节点上进行配置。

  elasticsearch:
...
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node

Kibana可视化工具

我們引入了Kibana,即使它不是必需的,但它可以方便地查看在Elasticsearch中注册的索引设置、映射定义和文档。

饲养员

在Zookeeper中,我们定义了healthcheck和depends_on来控制容器的健康检查和启动顺序。这不仅适用于Zookeeper,也适用于Confluent Platform的其他容器。

  zookeeper:
...
    healthcheck:
      test: "nc -z localhost 2181 || exit -1"
      interval: 10s
      timeout: 5s
      retries: 5
      start_period: 10s

经纪人(Kafka)

经纪人是Apache Kafka的Kafka节点的别称。虽然可以通过多个节点组成集群配置,但这次是为了验证目的,不需要集群配置,所以采用单个节点配置。
因此,将主题的复制因子数量(KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR、KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR)设为1。

KAFKA_LOG_RETENTION_MINUTES、KAFKA_LOG_CLEANUP_POLICY、KAFKA_COMPRESSION_TYPE这些参数定义了注册到Topic的日志段的生存期限和清理策略。这些主要与优化性能相关。

KAFKA_LOG_RETENTION_MINUTES
Topicに登録されたログセグメントは1分経過後に破棄されます。通常であれば7日間(デフォルト値)など、もっと長い期間を指定しますが、今回は検証目的であるためこの値で充分だと判断しました。

KAFKA_LOG_CLEANUP_POLICY
deleteは、古いログをクリーンアップする際に完全に削除します。別の設定値としてcompactを指定できます。compactの場合は、古いログを削除せず圧縮して保持します。

KAFKA_COMPRESSION_TYPE
ログセグメントの圧縮ポリシーです。uncompressedは圧縮しないで保持します。他にgzipやsnappy、lz4、zstdなど圧縮方式を選択することもできます。(参考サイト)

  broker:
...
    environment:
...
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
...
      KAFKA_LOG_RETENTION_MINUTES: 1
      KAFKA_LOG_CLEANUP_POLICY: delete
      KAFKA_COMPRESSION_TYPE: uncompressed

小贴士

根据Confluent Platform的Kafka容器镜像规范,配置值需要以KAFKA_作为前缀,并使用下划线作为分隔符来定义环境变量。

Schema Registry 模式注册表

对于Schema Registry的docker-compose定义,没有任何提及。

代理服务器

对于Rest Proxy的docker-compose定义,没有提及的要点。

链接器

复制因素

Connector可以在多个节点上形成一个组,但是为了验证目的,本次定义为单一节点。
因此,与Broker(Kafka)部分提到的一样,将Topic的复制因子设置为1。此外,Connector还将其配置设置的信息、状态和历史保存在Topic中。对应的Topic为CONNECT_CONFIG_STORAGE_TOPIC、CONNECT_OFFSET_STORAGE_TOPIC和CONNECT_STATUS_STORAGE_TOPIC。这些是为了增加容错性而设定的配置值。
正如前面提到的,由于本次只是为了验证目的,因此所有设置值都设为1。

  connect:
...
    environment:
...
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

转换器

在这次验证中,我们使用了org.apache.kafka.connect.json.JsonConverter作为键/值转换器。还有其他多个转换器可供使用。请详细参考官方文档。

根据实际使用体验来说,如果使用Single Message Transform,相比于org.apache.kafka.connect.json.JsonConverter,我觉得org.apache.kafka.connect.storage.StringConverter更容易控制。不过,由于没有进行全面验证,所以可能不准确。

建议

如果在选择转换器方面感到困惑,可以参考官方技术博客。这也可能会对您有所帮助。

插件路径

在CONNECT_PLUGIN_PATH中,指定了连接器插件的存储目录路径。
在另一篇文章中会介绍,但在本次验证中,由于想要验证使用Debezium提供的SMT以及自己制作的SMT进行事件转换,因此需要额外的指定。

/usr/share/java
デフォルトで定義されている値。変更しなくてよいもの。

/usr/share/confluent-hub-components
confluet-hubクライアントでConnectorプラグインをインストールする際の、デフォルトのインストール先。特に理由がなければ変更しなくてよいもの。

/usr/share/plugins/debezium
Debeziumから提供されている、Debezium Scriptingと依存関係にあるGroovyのライブラリを格納するディレクトリパスです。Debezium Scriptingは、Content-base RoutingのSMTを利用するためのものです。

/usr/share/plugins/custom
自作のSMTを格納するディレクトリパスです。

      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/plugins/debezium,/usr/share/plugins/custom"

用于远程调试的端口开放

为了远程调试Connector,将端口设置为5005进行公开。如果要等待客户端连接,请将suspend=y更改并重新启动Connector。

KAFKA_OPTS: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005

提示

基于Confluent Platform的Connector图像规范,配置值被指定为以CONNECT_为前缀,并且通过使用下划线作为分隔符来定义环境变量。
SMT是Single Message Transformation的缩写。(参考网站)

总结

我使用Docker容器建立了环境。
我解释了docker-compose的定义。
在下一篇文章中,我将使用Connector插件将数据流从PostgreSQL传输到Elasticsearch。

投稿済みの記事と投稿予定の記事

検証用のシステム構成を紹介
環境構築 ★イマココ
Connectorプラグインの登録、データストリーム
DebeziumのSMTを利用したメッセージ変換
自作のSMTを利用したメッセージ変換
ElasticsearchのIngest Pipelineを利用した変換

投稿するかもしれない記事

Confluent Platformの解説
KafkaとConnectorのパフォーマンスチューニング
Elasticsearchの解説
Elasticsearchのインデックス設定、マッピング定義の解説
Elasticsearchの search APIの解説
Elasticsearchをクラスタ構成に変更

广告
将在 10 秒后关闭
bannerAds