如何将PostgreSQL WAL记录注入Kafka
■ 作者简介
钟硕
现供职于迪卡侬,PostgreSQL & Oracle DBA
背景
容器化部署Kafka Connector
部署Kafka Connector前的准备工作
shared_preload_libraries = 'decoderbufs,wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
部署Debezium Connector插件
检查Kafka Container中插件的情况
docker ps -f name='debezium.*connect' -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
73dd73bd6872 debezium/connect "/.r/r /docker-ent..." 42 hours ago Up 42 hours r-debezium-connector-kafka-connector-1-a3726ec4
docker exec -it 73dd73bd6872 /bin/bash
[kafka@73dd73bd6872 connect]$ ls
debezium-connector-mysql debezium-connector-postgres
[kafka@73dd73bd6872 connect]$ pwd
/kafka/connect
验证Debezium PostgreSQL connector加载的信息
docker logs -t -f 73dd73bd6872 | more
2020-10-20T07:56:08.452286434Z --- Setting property from CONNECT_PLUGIN_PATH: plugin.path=/kafka/connect
2020-10-20T07:56:09.896637652Z 2020-10-20 07:56:09,889 - INFO [main:DelegatingClassLoader@246] - Loading plugin from: /kafka/connect/debezium-connector-postgres
2020-10-20T07:56:10.574470660Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@269] - Registered loader: PluginClassLoader{pl
uginLocation=file:/kafka/connect/debezium-connector-postgres/}
2020-10-20T07:56:10.574502062Z 2020-10-20 07:56:10,569 - INFO [main:DelegatingClassLoader@198] - Added plugin 'io.debezium.connector.pos
tgresql.PostgresConnector注册PG连接信息到connector
vim register-postgresql.json
{
"name": "fulfillment-connector", ①
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", ②
"tasks.max": "1", ③
"plugin.name": "wal2json_streaming", ④
"database.hostname": "postgresql", ⑤
"database.port": "5432", ⑥
"database.user": "postgres", ⑦
"database.password": "debezium", ⑧
"database.dbname" : "postgres", ⑨
"database.server.name": "fulfillment", ⑩
"schema.include.list": "inventory" ⑪
}
}
启动注册到Debezium PostgreSQL Connector中的连接服务
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @register-postgresql.json验证注册的信息
curl -H "Accept:application/json" localhost:8083/connectors/
["fulfillment-connector"]
2020-10-21T03:49:22.632581000Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - TaskConfig values:2020-10-21T03:49:22.632592801Z task.class = class io.debezium.connector.postgresql.PostgresConnectorTask2020-10-21T03:49:22.632596701Z 2020-10-21T03:49:22.632948722Z 2020-10-21 03:49:22,632 - INFO [StartAndStopExecutor-connect-1-4:Worker@524] - Instantiated task fulfillment-connector-0 with version 1.3.0.Final of type io.debezium.connector.postgresql.PostgresConnectorTask2020-10-21T03:49:22.633286743Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:AbstractConfig@354] - JsonConverterConfig values:2020-10-21T03:49:22.634988447Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@543] - Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task fulfillment-connector-0 using the worker config2020-10-21T03:49:22.634994347Z 2020-10-21 03:49:22,633 - INFO [StartAndStopExecutor-connect-1-4:Worker@550] - Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task fulfillment-connector-0 using the worker config
停止并删除注册的连接
curl -X DELETE localhost:8083/connectors/fulfillment-connector
# select pg_drop_replication_slot('debezium')
docker exec -it a5d73008228a /bin/bashroot@a5d73008228a:/# psql -d postgres -U postgrespostgres=# select * from pg_replication_slots;-[ RECORD 1 ]-------+----------slot_name | debeziumplugin | wal2jsonslot_type | logicaldatoid | 13067database | postgrestemporary | factive | tactive_pid | 1681xmin | catalog_xmin | 605restart_lsn | /209B200confirmed_flush_lsn | /209B238
postgres=# set search_path to inventory ;SET
postgres=# \dt List of relations Schema | Name | Type | Owner -----------+------------------+-------+---------- inventory | customers | table | postgres inventory | geom | table | postgres inventory | orders | table | postgres inventory | products | table | postgres inventory | products_on_hand | table | postgres inventory | spatial_ref_sys | table | postgres(6 rows)
postgres=# \d customers Table "inventory.customers" Column | Type | Collation | Nullable | Default ------------+------------------------+-----------+----------+--------------------------------------- id | integer | | not null | nextval('customers_id_seq'::regclass) first_name | character varying(255) | | not null | last_name | character varying(255) | | not null | email | character varying(255) | | not null |
Topic: fulfillment.inventory.customers{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, … "payload": { "before": null, "after": { "id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "sally.thomas@acme.com" }, "source": { "version": "1.3.0.Final", "connector": "postgresql", "name": "fulfillment", "ts_ms": 1603252163493, "snapshot": "true", "db": "postgres", "schema": "inventory", "table": "customers", "txId": 602, "lsn": 34078720, "xmin": null }, "op": "r", "ts_ms": 1603252163495, "transaction": null }}
相关文章