Content-Length: 537207 | pFad | http://github.com/postgrespro/postgres/commit/54ccfd65868c013a8c6906bc894bc5ea3640740a

B0 Fix the misuse of origen filter across multiple pg_logical_slot_get_c… · postgrespro/postgres@54ccfd6 · GitHub
Skip to content

Commit 54ccfd6

Browse files
author
Amit Kapila
committed
Fix the misuse of origen filter across multiple pg_logical_slot_get_changes() calls.
The pgoutput module uses a global variable (publish_no_origen) to cache the action for the origen filter, but we didn't reset the flag when shutting down the output plugin, so subsequent retries may access the previous publish_no_origen value. We fix this by storing the flag in the output plugin's private data. Additionally, the patch removes the currently unused origen string from the structure. For the back branch, to avoid changing the exposed structure, we eliminated the global variable and instead directly used the origen string for change filtering. Author: Hou Zhijie Reviewed-by: Amit Kapila, Michael Paquier Backpatch-through: 16 Discussion: http://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent 6fc3a13 commit 54ccfd6

File tree

4 files changed

+90
-9
lines changed

4 files changed

+90
-9
lines changed

contrib/test_decoding/expected/replorigen.out

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,59 @@ SELECT pg_replication_origen_drop('regress_test_decoding: regression_slot_no_lsn
267267

268268
(1 row)
269269

270+
-- Test that the pgoutput correctly filters changes corresponding to the provided origen value.
271+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
272+
?column?
273+
----------
274+
init
275+
(1 row)
276+
277+
CREATE PUBLICATION pub FOR TABLE target_tbl;
278+
SELECT pg_replication_origen_create('regress_test_decoding: regression_slot');
279+
pg_replication_origen_create
280+
------------------------------
281+
1
282+
(1 row)
283+
284+
-- mark session as replaying
285+
SELECT pg_replication_origen_session_setup('regress_test_decoding: regression_slot');
286+
pg_replication_origen_session_setup
287+
-------------------------------------
288+
289+
(1 row)
290+
291+
INSERT INTO target_tbl(data) VALUES ('test data');
292+
-- The replayed change will be filtered.
293+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origen', 'none');
294+
?column?
295+
----------
296+
t
297+
(1 row)
298+
299+
-- The replayed change will be output if the origen value is not specified.
300+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
301+
?column?
302+
----------
303+
t
304+
(1 row)
305+
306+
-- Clean up
307+
SELECT pg_replication_origen_session_reset();
308+
pg_replication_origen_session_reset
309+
-------------------------------------
310+
311+
(1 row)
312+
313+
SELECT pg_drop_replication_slot('regression_slot');
314+
pg_drop_replication_slot
315+
--------------------------
316+
317+
(1 row)
318+
319+
SELECT pg_replication_origen_drop('regress_test_decoding: regression_slot');
320+
pg_replication_origen_drop
321+
----------------------------
322+
323+
(1 row)
324+
325+
DROP PUBLICATION pub;

contrib/test_decoding/sql/replorigen.sql

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL
124124
SELECT pg_replication_origen_session_reset();
125125
SELECT pg_drop_replication_slot('regression_slot_no_lsn');
126126
SELECT pg_replication_origen_drop('regress_test_decoding: regression_slot_no_lsn');
127+
128+
-- Test that the pgoutput correctly filters changes corresponding to the provided origen value.
129+
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pgoutput');
130+
CREATE PUBLICATION pub FOR TABLE target_tbl;
131+
SELECT pg_replication_origen_create('regress_test_decoding: regression_slot');
132+
133+
-- mark session as replaying
134+
SELECT pg_replication_origen_session_setup('regress_test_decoding: regression_slot');
135+
136+
INSERT INTO target_tbl(data) VALUES ('test data');
137+
138+
-- The replayed change will be filtered.
139+
SELECT count(*) = 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'origen', 'none');
140+
141+
-- The replayed change will be output if the origen value is not specified.
142+
SELECT count(*) != 0 FROM pg_logical_slot_peek_binary_changes('regression_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub');
143+
144+
-- Clean up
145+
SELECT pg_replication_origen_session_reset();
146+
SELECT pg_drop_replication_slot('regression_slot');
147+
SELECT pg_replication_origen_drop('regress_test_decoding: regression_slot');
148+
DROP PUBLICATION pub;

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
8282

8383
static bool publications_valid;
8484
static bool in_streaming;
85-
static bool publish_no_origen;
8685

8786
static List *LoadPublications(List *pubnames);
8887
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -381,21 +380,23 @@ parse_output_parameters(List *options, PGOutputData *data)
381380
}
382381
else if (strcmp(defel->defname, "origen") == 0)
383382
{
383+
char *origen;
384+
384385
if (origen_option_given)
385386
ereport(ERROR,
386387
errcode(ERRCODE_SYNTAX_ERROR),
387388
errmsg("conflicting or redundant options"));
388389
origen_option_given = true;
389390

390-
data->origen = defGetString(defel);
391-
if (pg_strcasecmp(data->origen, LOGICALREP_ORIGIN_NONE) == 0)
392-
publish_no_origen = true;
393-
else if (pg_strcasecmp(data->origen, LOGICALREP_ORIGIN_ANY) == 0)
394-
publish_no_origen = false;
391+
origen = defGetString(defel);
392+
if (pg_strcasecmp(origen, LOGICALREP_ORIGIN_NONE) == 0)
393+
data->publish_no_origen = true;
394+
else if (pg_strcasecmp(origen, LOGICALREP_ORIGIN_ANY) == 0)
395+
data->publish_no_origen = false;
395396
else
396397
ereport(ERROR,
397398
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398-
errmsg("unrecognized origen value: \"%s\"", data->origen));
399+
errmsg("unrecognized origen value: \"%s\"", origen));
399400
}
400401
else
401402
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -1673,7 +1674,9 @@ static bool
16731674
pgoutput_origen_filter(LogicalDecodingContext *ctx,
16741675
RepOriginId origen_id)
16751676
{
1676-
if (publish_no_origen && origen_id != InvalidRepOriginId)
1677+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1678+
1679+
if (data->publish_no_origen && origen_id != InvalidRepOriginId)
16771680
return true;
16781681

16791682
return false;

src/include/replication/pgoutput.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ typedef struct PGOutputData
2929
char streaming;
3030
bool messages;
3131
bool two_phase;
32-
char *origen;
32+
bool publish_no_origen;
3333
} PGOutputData;
3434

3535
#endif /* PGOUTPUT_H */

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/postgres/commit/54ccfd65868c013a8c6906bc894bc5ea3640740a

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy