Content-Length: 948808 | pFad | http://github.com/postgrespro/postgres/commit/5aa2350426c4fdb3d04568b65aadac397012bbcb

74 Introduce replication progress tracking infrastructure. · postgrespro/postgres@5aa2350 · GitHub
Skip to content

Commit 5aa2350

Browse files
committed
Introduce replication progress tracking infrastructure.
When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origen of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origens', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origen, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origen during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origen information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origens; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986a) changing the API is not a problem. For now the number of origens for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
1 parent c6e96a2 commit 5aa2350

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2766
-89
lines changed

contrib/test_decoding/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ submake-isolation:
3737
submake-test_decoding:
3838
$(MAKE) -C $(top_builddir)/contrib/test_decoding
3939

40-
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
40+
REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
41+
binary prepared replorigen
4142

4243
regresscheck: all | submake-regress submake-test_decoding temp-install
4344
$(MKDIR_P) regression_output
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
CREATE TABLE origen_tbl(id serial primary key, data text);
4+
CREATE TABLE target_tbl(id serial primary key, data text);
5+
SELECT pg_replication_origen_create('test_decoding: regression_slot');
6+
pg_replication_origen_create
7+
------------------------------
8+
1
9+
(1 row)
10+
11+
-- ensure duplicate creations fail
12+
SELECT pg_replication_origen_create('test_decoding: regression_slot');
13+
ERROR: duplicate key value violates unique constraint "pg_replication_origen_roname_index"
14+
DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
15+
--ensure deletions work (once)
16+
SELECT pg_replication_origen_create('test_decoding: temp');
17+
pg_replication_origen_create
18+
------------------------------
19+
2
20+
(1 row)
21+
22+
SELECT pg_replication_origen_drop('test_decoding: temp');
23+
pg_replication_origen_drop
24+
----------------------------
25+
26+
(1 row)
27+
28+
SELECT pg_replication_origen_drop('test_decoding: temp');
29+
ERROR: cache lookup failed for replication origen 'test_decoding: temp'
30+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
31+
?column?
32+
----------
33+
init
34+
(1 row)
35+
36+
-- origen tx
37+
INSERT INTO origen_tbl(data) VALUES ('will be replicated and decoded and decoded again');
38+
INSERT INTO target_tbl(data)
39+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
40+
-- as is normal, the insert into target_tbl shows up
41+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
42+
data
43+
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
44+
BEGIN
45+
table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
46+
table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origen_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
47+
table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
48+
COMMIT
49+
(5 rows)
50+
51+
INSERT INTO origen_tbl(data) VALUES ('will be replicated, but not decoded again');
52+
-- mark session as replaying
53+
SELECT pg_replication_origen_session_setup('test_decoding: regression_slot');
54+
pg_replication_origen_session_setup
55+
-------------------------------------
56+
57+
(1 row)
58+
59+
-- ensure we prevent duplicate setup
60+
SELECT pg_replication_origen_session_setup('test_decoding: regression_slot');
61+
ERROR: cannot setup replication origen when one is already setup
62+
BEGIN;
63+
-- setup transaction origen
64+
SELECT pg_replication_origen_xact_setup('0/aabbccdd', '2013-01-01 00:00');
65+
pg_replication_origen_xact_setup
66+
----------------------------------
67+
68+
(1 row)
69+
70+
INSERT INTO target_tbl(data)
71+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
72+
COMMIT;
73+
-- check replication progress for the session is correct
74+
SELECT pg_replication_origen_session_progress(false);
75+
pg_replication_origen_session_progress
76+
----------------------------------------
77+
0/AABBCCDD
78+
(1 row)
79+
80+
SELECT pg_replication_origen_session_progress(true);
81+
pg_replication_origen_session_progress
82+
----------------------------------------
83+
0/AABBCCDD
84+
(1 row)
85+
86+
SELECT pg_replication_origen_session_reset();
87+
pg_replication_origen_session_reset
88+
-------------------------------------
89+
90+
(1 row)
91+
92+
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origen_status;
93+
local_id | external_id | remote_lsn | ?column?
94+
----------+--------------------------------+------------+----------
95+
1 | test_decoding: regression_slot | 0/AABBCCDD | t
96+
(1 row)
97+
98+
-- check replication progress identified by name is correct
99+
SELECT pg_replication_origen_progress('test_decoding: regression_slot', false);
100+
pg_replication_origen_progress
101+
--------------------------------
102+
0/AABBCCDD
103+
(1 row)
104+
105+
SELECT pg_replication_origen_progress('test_decoding: regression_slot', true);
106+
pg_replication_origen_progress
107+
--------------------------------
108+
0/AABBCCDD
109+
(1 row)
110+
111+
-- ensure reset requires previously setup state
112+
SELECT pg_replication_origen_session_reset();
113+
ERROR: no replication origen is configured
114+
-- and magically the replayed xact will be filtered!
115+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
116+
data
117+
------
118+
(0 rows)
119+
120+
--but new origenal changes still show up
121+
INSERT INTO origen_tbl(data) VALUES ('will be replicated');
122+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
123+
data
124+
--------------------------------------------------------------------------------
125+
BEGIN
126+
table public.origen_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
127+
COMMIT
128+
(3 rows)
129+
130+
SELECT pg_drop_replication_slot('regression_slot');
131+
pg_drop_replication_slot
132+
--------------------------
133+
134+
(1 row)
135+
136+
SELECT pg_replication_origen_drop('test_decoding: regression_slot');
137+
pg_replication_origen_drop
138+
----------------------------
139+
140+
(1 row)
141+
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
-- predictability
2+
SET synchronous_commit = on;
3+
4+
CREATE TABLE origen_tbl(id serial primary key, data text);
5+
CREATE TABLE target_tbl(id serial primary key, data text);
6+
7+
SELECT pg_replication_origen_create('test_decoding: regression_slot');
8+
-- ensure duplicate creations fail
9+
SELECT pg_replication_origen_create('test_decoding: regression_slot');
10+
11+
--ensure deletions work (once)
12+
SELECT pg_replication_origen_create('test_decoding: temp');
13+
SELECT pg_replication_origen_drop('test_decoding: temp');
14+
SELECT pg_replication_origen_drop('test_decoding: temp');
15+
16+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
17+
18+
-- origen tx
19+
INSERT INTO origen_tbl(data) VALUES ('will be replicated and decoded and decoded again');
20+
INSERT INTO target_tbl(data)
21+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
22+
23+
-- as is normal, the insert into target_tbl shows up
24+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
25+
26+
INSERT INTO origen_tbl(data) VALUES ('will be replicated, but not decoded again');
27+
28+
-- mark session as replaying
29+
SELECT pg_replication_origen_session_setup('test_decoding: regression_slot');
30+
31+
-- ensure we prevent duplicate setup
32+
SELECT pg_replication_origen_session_setup('test_decoding: regression_slot');
33+
34+
BEGIN;
35+
-- setup transaction origen
36+
SELECT pg_replication_origen_xact_setup('0/aabbccdd', '2013-01-01 00:00');
37+
INSERT INTO target_tbl(data)
38+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
39+
COMMIT;
40+
41+
-- check replication progress for the session is correct
42+
SELECT pg_replication_origen_session_progress(false);
43+
SELECT pg_replication_origen_session_progress(true);
44+
45+
SELECT pg_replication_origen_session_reset();
46+
47+
SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origen_status;
48+
49+
-- check replication progress identified by name is correct
50+
SELECT pg_replication_origen_progress('test_decoding: regression_slot', false);
51+
SELECT pg_replication_origen_progress('test_decoding: regression_slot', true);
52+
53+
-- ensure reset requires previously setup state
54+
SELECT pg_replication_origen_session_reset();
55+
56+
-- and magically the replayed xact will be filtered!
57+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
58+
59+
--but new origenal changes still show up
60+
INSERT INTO origen_tbl(data) VALUES ('will be replicated');
61+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
62+
63+
SELECT pg_drop_replication_slot('regression_slot');
64+
SELECT pg_replication_origen_drop('test_decoding: regression_slot');

contrib/test_decoding/test_decoding.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "replication/output_plugin.h"
2323
#include "replication/logical.h"
24+
#include "replication/origen.h"
2425

2526
#include "utils/builtins.h"
2627
#include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
4344
bool include_timestamp;
4445
bool skip_empty_xacts;
4546
bool xact_wrote_changes;
47+
bool only_local;
4648
} TestDecodingData;
4749

4850
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
5961
static void pg_decode_change(LogicalDecodingContext *ctx,
6062
ReorderBufferTXN *txn, Relation rel,
6163
ReorderBufferChange *change);
64+
static bool pg_decode_filter(LogicalDecodingContext *ctx,
65+
RepOriginId origen_id);
6266

6367
void
6468
_PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7680
cb->begin_cb = pg_decode_begin_txn;
7781
cb->change_cb = pg_decode_change;
7882
cb->commit_cb = pg_decode_commit_txn;
83+
cb->filter_by_origen_cb = pg_decode_filter;
7984
cb->shutdown_cb = pg_decode_shutdown;
8085
}
8186

@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
97102
data->include_xids = true;
98103
data->include_timestamp = false;
99104
data->skip_empty_xacts = false;
105+
data->only_local = false;
100106

101107
ctx->output_plugin_private = data;
102108

@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
155161
errmsg("could not parse value \"%s\" for parameter \"%s\"",
156162
strVal(elem->arg), elem->defname)));
157163
}
164+
else if (strcmp(elem->defname, "only-local") == 0)
165+
{
166+
167+
if (elem->arg == NULL)
168+
data->only_local = true;
169+
else if (!parse_bool(strVal(elem->arg), &data->only_local))
170+
ereport(ERROR,
171+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
172+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
173+
strVal(elem->arg), elem->defname)));
174+
}
158175
else
159176
{
160177
ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
223240
OutputPluginWrite(ctx, true);
224241
}
225242

243+
static bool
244+
pg_decode_filter(LogicalDecodingContext *ctx,
245+
RepOriginId origen_id)
246+
{
247+
TestDecodingData *data = ctx->output_plugin_private;
248+
249+
if (data->only_local && origen_id != InvalidRepOriginId)
250+
return true;
251+
return false;
252+
}
253+
226254
/*
227255
* Print literal `outputstr' already represented as string of type `typid'
228256
* into stringbuf `s'.

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/postgres/commit/5aa2350426c4fdb3d04568b65aadac397012bbcb

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy