给您介绍一下PostgreSQL的逻辑解码功能

我将总结一下我调查了关于从PostgreSQL 9.4版本开始实施的逻辑解码的结果。个人认为这是一个非常热门的功能,但在网络上几乎没有关于它的日文信息,所以我也希望通过这次介绍来宣传一下。

逻辑解码是什么?

这是一项用于从横向提取对PostgreSQL进行的更改的功能。

未来的。

    • 論理レプリケーション:特定データベース、特定テーブルのみレプリケーションetc.

 

    • PostgreSQLサーバ間で更新を伝播しあうことで、マルチマスタなDBサーバクラスタを実現

 

    PostgreSQLから更新差分を引き出して、他業務のDBやHadoopなどに効率的にデータ連携

预计会出现如此的进化。

提取更改信息是通过PostgreSQL的事务日志(WAL)进行的。事务日志并不以可读的形式保存,但它具有将其逻辑转换的功能,因此被称为这样的名称。

具体来说,

BEGIN;
INSERT INTO foo VALUES (103, 'inserted');
DELETE FROM foo WHERE a = 82;
UPDATE foo SET b = 'updated' WHERE a = 21;
COMMIT;

如果进行了像这样的更新,可以使用Logical Decoding来提取更改信息,将得到以下信息。

postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot', NULL, NULL);
 location  | xid  |                            data
-----------+------+-------------------------------------------------------------
 0/1737AE8 | 1896 | BEGIN 1896
 0/1737AE8 | 1896 | table public.foo: INSERT: a[integer]:103 b[text]:'inserted'
 0/1737BB8 | 1896 | table public.foo: DELETE: a[integer]:82
 0/1737C00 | 1896 | table public.foo: UPDATE: a[integer]:21 b[text]:'updated'
 0/1737C98 | 1896 | COMMIT 1896
(5 rows)

怎么样?虽然输出表达方式有点怪,但改动还是能清楚地提取出来的。

出力的功能被插件化了,上面的例子是使用附带在PostgreSQL核心中的示例插件test_decoding。

输出插件

经过在网上搜索后,目前已经发布了以下的输出插件。

decorder_raw,作者是michaelpq。

它会以相当SQL风格进行输出。使用与上述相同的例子,

# SELECT * FROM pg_logical_slot_get_changes('rep_slot_2', NULL, NULL);
 location  | xid  |                           data
-----------+------+-----------------------------------------------------------
 0/1737F60 | 1897 | INSERT INTO public.foo (a, b) VALUES (104, 'inserted');
 0/1739A80 | 1897 | DELETE FROM public.foo WHERE a = 81;
 0/1739B08 | 1897 | UPDATE public.foo SET a = 23, b = 'updated' WHERE a = 23;
(3 rows)

这个样子。几乎完美地再现了。可以直接将其作为SQL导入到另一个数据库中。

eulerto 制作的 wal2json

会以JSON格式进行输出,就像这样。

[local] 18452 postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot_3',
 NULL, NULL);
 location  | xid  |                           data
-----------+------+-----------------------------------------------------------
 0/1739D20 | 1898 | {                                                        +
           |      |         "xid": 1898,                                     +
           |      |         "change": [
 0/1739D20 | 1898 |                 {                                        +
           |      |                         "kind": "insert",                +
           |      |                         "schema": "public",              +
           |      |                         "table": "foo",                  +
           |      |                         "columnnames": ["a", "b"],       +
           |      |                         "columntypes": ["int4", "text"], +
           |      |                         "columnvalues": [105, "inserted"]+
           |      |                 }
 0/173B8B0 | 1898 |                 ,{                                       +
           |      |                         "kind": "delete",                +
           |      |                         "schema": "public",              +
           |      |                         "table": "foo",                  +
           |      |                         "oldkeys": {                     +
           |      |                                 "keynames": ["a"],       +
           |      |                                 "keytypes": ["int4"],    +
           |      |                                 "keyvalues": [80]        +
           |      |                         }                                +
           |      |                 }
 0/173B8F8 | 1898 |                 ,{                                       +
           |      |                         "kind": "update",                +
           |      |                         "schema": "public",              +
           |      |                         "table": "foo",                  +
           |      |                         "columnnames": ["a", "b"],       +
           |      |                         "columntypes": ["int4", "text"], +
           |      |                         "columnvalues": [24, "updated"], +
           |      |                         "oldkeys": {                     +
           |      |                                 "keynames": ["a"],       +
           |      |                                 "keytypes": ["int4"],    +
           |      |                                 "keyvalues": [24]        +
           |      |                         }                                +
           |      |                 }
 0/173B9D0 | 1898 |         ]                                                +
           |      | }
(5 rows)

decoderbufs 由 xstevens 提供

会以 Protocol Buffers 格式输出。尚未经过验证。

康方的瓶装水产品 – 由Confluent Inc提供

这是一个能够提取变更信息并存储到Apache Kafka的工具,非常棒!

阅读介绍博客后,我发现有一个想法,通过Kafka来传播数据到缓存、用于分析的Hadoop、以及监控等,这个架构非常有趣。但目前这个功能尚未经过验证。

如何使用Logical Decoding

不难的。

PostgreSQL的配置

在postgresql.conf中,

    • wal_level = logical

 

    max_replication_slot に1以上の値をセット

需要保留为修改生效。要使更改生效,需要重新启动PostgreSQL。
如后文所述,如果在读取更改信息时使用复制连接,则还需以上述之外。

    • postgresql.confにてmax_wal_senders に1以上の値をセット

 

    pg_hba.confにレプリケーション接続で接続するユーザの情報を追加

需要。

安装出力插件

安装方法因插件而异。对于附带的test_decoding插件,需要从PostgreSQL源代码的根目录开始操作。

$ cd contrib/test_decoding
$ make
# make insatll

可以通过安装来完成。

创建逻辑复制槽

我们将通过逻辑解码创建一个接口来读取变更信息。读取的方法是,

    • SQL

 

    レプリケーション接続

有两种方法可以实现:这里讲解使用SQL的情况。我们可以使用psql连接到PostgreSQL数据库的超级用户,并执行pg_create_logical_replication_slot函数。

postgres=# SELECT * FROM pg_create_logical_replication_slot('rep_slot', 'test_decoding');

以下のように中国語で言い換えます(一つのオプション):
该函数的参数是:

    • 作成する論理レプリケーション・スロットの名前

 

    利用する出力プラグインの名前

這是。 (Zhè shì.)

读取变更信息

如果使用SQL进行读取,需要获取更改信息。

    • pg_logical_slot_get_changes

 

    pg_logical_slot_peek_changes

可以通过两种类型的函数来提取。这两者的区别在于get和peek,将变更信息视为队列,并从队列中提取信息(get),或者在不提取的情况下进行检查(peek)。

两个参数相同

    • 読み出し先の論理レプリケーション・スロットの名前

 

    • 読み出しを終えるポイント(トランザクションログのIDであるLSN、もしくは行数で指定)

 

    出力プラグインのオプションをkeyとvalueで指定

是的。

假设

postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot', NULL, 10, 'include-timestamp', 'on');
 location  | xid  |                            data
-----------+------+-------------------------------------------------------------
 0/1737F60 | 1897 | BEGIN 1897
 0/1737F60 | 1897 | table public.foo: INSERT: a[integer]:104 b[text]:'inserted'
 0/1739A80 | 1897 | table public.foo: DELETE: a[integer]:81
 0/1739B08 | 1897 | table public.foo: UPDATE: a[integer]:23 b[text]:'updated'
 0/1739BA0 | 1897 | COMMIT 1897 (at 2015-08-20 22:50:23.85002+09)
 0/1739D20 | 1898 | BEGIN 1898
 0/1739D20 | 1898 | table public.foo: INSERT: a[integer]:105 b[text]:'inserted'
 0/173B8B0 | 1898 | table public.foo: DELETE: a[integer]:80
 0/173B8F8 | 1898 | table public.foo: UPDATE: a[integer]:24 b[text]:'updated'
 0/173B9D0 | 1898 | COMMIT 1898 (at 2015-08-20 22:55:17.529238+09)
(10 rows)

在这种情况下,我们从名为rep_slot的槽中读取10行修改行。同时,通过给输出插件test_decoding提供include-timestamp = on选项,可以输出提交修改时的时间戳。

请给予更详细的使用说明

请参考以下文档:
http://www.postgresql.jp/document/9.4/html/logicaldecoding.html

最后

你觉得怎么样呢?

如果在9.4之前的PostgreSQL中尝试做类似的事情,

    • Slony-Iのようなツールを使う

 

    • APもしくはトリガを使って、更新差分をテーブルとして保管しておく

 

    バッチ処理でDBからデータをダンプして、更新差分を作成して連携先に渡す

进行这样的操作需要花费了很多功夫。

我认为,逻辑解码是一种值得期待的功能,可以轻松且高效地将数据从PostgreSQL连接到外部系统。

广告
将在 10 秒后关闭
bannerAds