给您介绍一下PostgreSQL的逻辑解码功能
我将总结一下我调查了关于从PostgreSQL 9.4版本开始实施的逻辑解码的结果。个人认为这是一个非常热门的功能,但在网络上几乎没有关于它的日文信息,所以我也希望通过这次介绍来宣传一下。
逻辑解码是什么?
这是一项用于从横向提取对PostgreSQL进行的更改的功能。
未来的。
-
- 論理レプリケーション:特定データベース、特定テーブルのみレプリケーションetc.
-
- PostgreSQLサーバ間で更新を伝播しあうことで、マルチマスタなDBサーバクラスタを実現
- PostgreSQLから更新差分を引き出して、他業務のDBやHadoopなどに効率的にデータ連携
预计会出现如此的进化。
提取更改信息是通过PostgreSQL的事务日志(WAL)进行的。事务日志并不以可读的形式保存,但它具有将其逻辑转换的功能,因此被称为这样的名称。
具体来说,
BEGIN;
INSERT INTO foo VALUES (103, 'inserted');
DELETE FROM foo WHERE a = 82;
UPDATE foo SET b = 'updated' WHERE a = 21;
COMMIT;
如果进行了像这样的更新,可以使用Logical Decoding来提取更改信息,将得到以下信息。
postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot', NULL, NULL);
location | xid | data
-----------+------+-------------------------------------------------------------
0/1737AE8 | 1896 | BEGIN 1896
0/1737AE8 | 1896 | table public.foo: INSERT: a[integer]:103 b[text]:'inserted'
0/1737BB8 | 1896 | table public.foo: DELETE: a[integer]:82
0/1737C00 | 1896 | table public.foo: UPDATE: a[integer]:21 b[text]:'updated'
0/1737C98 | 1896 | COMMIT 1896
(5 rows)
怎么样?虽然输出表达方式有点怪,但改动还是能清楚地提取出来的。
出力的功能被插件化了,上面的例子是使用附带在PostgreSQL核心中的示例插件test_decoding。
输出插件
经过在网上搜索后,目前已经发布了以下的输出插件。
decorder_raw,作者是michaelpq。
它会以相当SQL风格进行输出。使用与上述相同的例子,
# SELECT * FROM pg_logical_slot_get_changes('rep_slot_2', NULL, NULL);
location | xid | data
-----------+------+-----------------------------------------------------------
0/1737F60 | 1897 | INSERT INTO public.foo (a, b) VALUES (104, 'inserted');
0/1739A80 | 1897 | DELETE FROM public.foo WHERE a = 81;
0/1739B08 | 1897 | UPDATE public.foo SET a = 23, b = 'updated' WHERE a = 23;
(3 rows)
这个样子。几乎完美地再现了。可以直接将其作为SQL导入到另一个数据库中。
eulerto 制作的 wal2json
会以JSON格式进行输出,就像这样。
[local] 18452 postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot_3',
NULL, NULL);
location | xid | data
-----------+------+-----------------------------------------------------------
0/1739D20 | 1898 | { +
| | "xid": 1898, +
| | "change": [
0/1739D20 | 1898 | { +
| | "kind": "insert", +
| | "schema": "public", +
| | "table": "foo", +
| | "columnnames": ["a", "b"], +
| | "columntypes": ["int4", "text"], +
| | "columnvalues": [105, "inserted"]+
| | }
0/173B8B0 | 1898 | ,{ +
| | "kind": "delete", +
| | "schema": "public", +
| | "table": "foo", +
| | "oldkeys": { +
| | "keynames": ["a"], +
| | "keytypes": ["int4"], +
| | "keyvalues": [80] +
| | } +
| | }
0/173B8F8 | 1898 | ,{ +
| | "kind": "update", +
| | "schema": "public", +
| | "table": "foo", +
| | "columnnames": ["a", "b"], +
| | "columntypes": ["int4", "text"], +
| | "columnvalues": [24, "updated"], +
| | "oldkeys": { +
| | "keynames": ["a"], +
| | "keytypes": ["int4"], +
| | "keyvalues": [24] +
| | } +
| | }
0/173B9D0 | 1898 | ] +
| | }
(5 rows)
decoderbufs 由 xstevens 提供
会以 Protocol Buffers 格式输出。尚未经过验证。
康方的瓶装水产品 – 由Confluent Inc提供
这是一个能够提取变更信息并存储到Apache Kafka的工具,非常棒!
阅读介绍博客后,我发现有一个想法,通过Kafka来传播数据到缓存、用于分析的Hadoop、以及监控等,这个架构非常有趣。但目前这个功能尚未经过验证。
如何使用Logical Decoding
不难的。
PostgreSQL的配置
在postgresql.conf中,
-
- wal_level = logical
- max_replication_slot に1以上の値をセット
需要保留为修改生效。要使更改生效,需要重新启动PostgreSQL。
如后文所述,如果在读取更改信息时使用复制连接,则还需以上述之外。
-
- postgresql.confにてmax_wal_senders に1以上の値をセット
- pg_hba.confにレプリケーション接続で接続するユーザの情報を追加
需要。
安装出力插件
安装方法因插件而异。对于附带的test_decoding插件,需要从PostgreSQL源代码的根目录开始操作。
$ cd contrib/test_decoding
$ make
# make insatll
可以通过安装来完成。
创建逻辑复制槽
我们将通过逻辑解码创建一个接口来读取变更信息。读取的方法是,
-
- SQL
- レプリケーション接続
有两种方法可以实现:这里讲解使用SQL的情况。我们可以使用psql连接到PostgreSQL数据库的超级用户,并执行pg_create_logical_replication_slot函数。
postgres=# SELECT * FROM pg_create_logical_replication_slot('rep_slot', 'test_decoding');
以下のように中国語で言い換えます(一つのオプション):
该函数的参数是:
-
- 作成する論理レプリケーション・スロットの名前
- 利用する出力プラグインの名前
這是。 (Zhè shì.)
读取变更信息
如果使用SQL进行读取,需要获取更改信息。
-
- pg_logical_slot_get_changes
- pg_logical_slot_peek_changes
可以通过两种类型的函数来提取。这两者的区别在于get和peek,将变更信息视为队列,并从队列中提取信息(get),或者在不提取的情况下进行检查(peek)。
两个参数相同
-
- 読み出し先の論理レプリケーション・スロットの名前
-
- 読み出しを終えるポイント(トランザクションログのIDであるLSN、もしくは行数で指定)
- 出力プラグインのオプションをkeyとvalueで指定
是的。
假设
postgres=# SELECT * FROM pg_logical_slot_get_changes('rep_slot', NULL, 10, 'include-timestamp', 'on');
location | xid | data
-----------+------+-------------------------------------------------------------
0/1737F60 | 1897 | BEGIN 1897
0/1737F60 | 1897 | table public.foo: INSERT: a[integer]:104 b[text]:'inserted'
0/1739A80 | 1897 | table public.foo: DELETE: a[integer]:81
0/1739B08 | 1897 | table public.foo: UPDATE: a[integer]:23 b[text]:'updated'
0/1739BA0 | 1897 | COMMIT 1897 (at 2015-08-20 22:50:23.85002+09)
0/1739D20 | 1898 | BEGIN 1898
0/1739D20 | 1898 | table public.foo: INSERT: a[integer]:105 b[text]:'inserted'
0/173B8B0 | 1898 | table public.foo: DELETE: a[integer]:80
0/173B8F8 | 1898 | table public.foo: UPDATE: a[integer]:24 b[text]:'updated'
0/173B9D0 | 1898 | COMMIT 1898 (at 2015-08-20 22:55:17.529238+09)
(10 rows)
在这种情况下,我们从名为rep_slot的槽中读取10行修改行。同时,通过给输出插件test_decoding提供include-timestamp = on选项,可以输出提交修改时的时间戳。
请给予更详细的使用说明
请参考以下文档:
http://www.postgresql.jp/document/9.4/html/logicaldecoding.html
最后
你觉得怎么样呢?
如果在9.4之前的PostgreSQL中尝试做类似的事情,
-
- Slony-Iのようなツールを使う
-
- APもしくはトリガを使って、更新差分をテーブルとして保管しておく
- バッチ処理でDBからデータをダンプして、更新差分を作成して連携先に渡す
进行这样的操作需要花费了很多功夫。
我认为,逻辑解码是一种值得期待的功能,可以轻松且高效地将数据从PostgreSQL连接到外部系统。