PostgreSQL

本文档提供有关设置 PostgreSQL 连接配置

CDC 原理

PostgreSQL 的逻辑解码功能最早出现在9.4版本中,它是一种机制,允许提取提交到事务日志中的更改,并通过输出插件以用户友好的方式处理这些更改。

此输出插件必须在运行PostgreSQL服务器之前安装,并与一个复制槽一起启用,以便客户端能够使用更改。

提供的cdc支持

  • 逻辑解码(Logical Decoding),用于从 WAL 日志中解析逻辑变更事件

  • 复制协议(Replication Protocol):提供了消费者实时订阅(甚至同步订阅)数据库变更的机制

  • 快照导出(export snapshot):允许导出数据库的一致性快照(pg_export_snapshot)

  • 复制槽(Replication Slot),用于保存消费者偏移量,跟踪订阅者进度。

所以,根据以上,我们需要安装逻辑解码器,现有提供的解码器如下

解码器

pg版本

tapdata是否支持

输出格式

decoderbufs

9.6+

protobuf

wal2json

9.4+

✔️

json

pgoutput

10.0+

❌️

pg log

test_decoding

9.4+

text

decoder_raw

9.4+

SQL

PostgreSQL支持的最低版本:9.4+

先决条件

安装插件

安装步骤

以 wal2json 为例,安装步骤如下

确保环境变量PATH中包含"/bin"

export PATH=$PATH:<postgres安装路径>/bin

安装插件

git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& USE_PGXS=1 make \
&& USE_PGXS=1 make install \
&& cd .. \
&& rm -rf wal2json

配置文件

如果你正在使用一个支持的逻辑解码插件(不能是 pgoutput ),并且它已经安装,配置服务器在启动时加载插件:

postgresql.conf
shared_preload_libraries = 'decoderbufs,wal2json'

配置replication

postgresql.conf
# REPLICATION
wal_level = logical
max_wal_senders = 1 # 大于0即可
max_replication_slots = 1 # 大于0即可

权限

用户需要有replication、login权限

CREATE ROLE name REPLICATION LOGIN;

配置文件 pg_hba.conf 需要添加如下内容:

pg_hba.conf
local replication <youruser> trust
host replication <youruser> 127.0.0.1/32 trust
host replication <youruser> ::1/128 trust

以上只是基本权限的设置,实际场景可能更加复杂

测试日志插件是否可用

注意:以下操作建议在POC环境进行

连接postgres数据库,切换至需要同步的数据库,创建一张测试表

-- 假设需要同步的数据库为postgres,模型为public
\c postgres
create table public.test_decode
(
uid integer not null
constraint users_pk
primary key,
name varchar(50),
age integer,
score decimal
)

可以根据自己情况创建一张测试表

  • 创建 slot 连接,以 wal2json 插件为例

select * from pg_create_logical_replication_slot('slot_test', 'wal2json')
  • 创建成功后,对测试表插入一条数据

  • 监听日志,查看返回结果,是否有刚才插入操作的信息

select * from pg_logical_slot_peek_changes('slot_test', null, null)
  • 成功后,销毁slot连接,删除测试表

select * from pg_drop_replication_slot('slot_test')
drop table public.test_decode

创建连接

  1. 数据库类型:PostgreSQL

  2. 连接类型:源头/源头和目标

  3. 填写数据库连接信息

  4. 选择日志解码器

    • json streaming: wal2json插件(默认项)

    • json streaming on rds: wal2json插件,用于云rds服务

    • pgoutput: pgoutput插件

异常处理

Slot清理

如果 tapdata 由于不可控异常(断电、进程崩溃等),导致cdc中断,会导致 slot 连接无法正确从 pg 主节点删除,将一直占用一个 slot 连接名额,需手动登录主节点,进行删除

查询slot信息

// 查看是否有slot_name=tapdata的信息
TABLE pg_replication_slots;

如图

删除slot节点

select * from pg_drop_replication_slot('tapdata');

删除操作

在使用 wal2json 插件解码时,如果源表没有主键,则无法实现增量同步的删除操作