Content-Length: 580112 | pFad | http://github.com/postgrespro/postgres/commit/88f488319bac051b874bcec87941217e25e0e126

93 Make the tablesync worker's replication origen drop logic robust. · postgrespro/postgres@88f4883 · GitHub
Skip to content

Commit 88f4883

Browse files
author
Amit Kapila
committed
Make the tablesync worker's replication origen drop logic robust.
In commit f6c5edb, we started to drop the replication origen slots before tablesync worker exits to avoid consuming more slots than required. We were dropping the replication origen in the same transaction where we were marking the tablesync state as SYNCDONE. Now, if there is any error after we have dropped the origen but before we commit the containing transaction, the in-memory state of replication progress won't be rolled back. Due to this, after the restart, tablesync worker can start streaming from the wrong location and can apply the already processed transaction. To fix this, we need to opportunistically drop the origen after marking the tablesync state as SYNCDONE. Even, if the tablesync worker fails to remove the replication origen before exit, the apply worker ensures to clean it up afterward. Reported by Tom Lane as per buildfarm. Diagnosed-by: Masahiko Sawada Author: Hou Zhijie Reviewed-By: Masahiko Sawada, Amit Kapila Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com Discussion: https://postgr.es/m/CAD21AoAw0Oofi4kiDpJBOwpYyBBBkJj=sLUOn4Gd2GjUAKG-fw@mail.gmail.com
1 parent 5015e1e commit 88f4883

File tree

2 files changed

+75
-41
lines changed

2 files changed

+75
-41
lines changed

src/backend/commands/subscriptioncmds.c

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -931,19 +931,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
931931
logicalrep_worker_stop(sub->oid, relid);
932932

933933
/*
934-
* For READY state and SYNCDONE state, we would have already
935-
* dropped the tablesync origen.
934+
* For READY state, we would have already dropped the
935+
* tablesync origen.
936936
*/
937-
if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
937+
if (state != SUBREL_STATE_READY)
938938
{
939939
char origenname[NAMEDATALEN];
940940

941941
/*
942942
* Drop the tablesync's origen tracking if exists.
943943
*
944944
* It is possible that the origen is not yet created for
945-
* tablesync worker so passing missing_ok = true. This can
946-
* happen for the states before SUBREL_STATE_FINISHEDCOPY.
945+
* tablesync worker, this can happen for the states before
946+
* SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
947+
* apply worker can also concurrently try to drop the
948+
* origen and by this time the origen might be already
949+
* removed. For these reasons, passing missing_ok = true.
947950
*/
948951
ReplicationOriginNameForTablesync(sub->oid, relid, origenname,
949952
sizeof(origenname));
@@ -1516,19 +1519,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
15161519
/*
15171520
* Drop the tablesync's origen tracking if exists.
15181521
*
1519-
* For SYNCDONE/READY states, the tablesync origen tracking is known
1520-
* to have already been dropped by the tablesync worker.
1521-
*
15221522
* It is possible that the origen is not yet created for tablesync
15231523
* worker so passing missing_ok = true. This can happen for the states
15241524
* before SUBREL_STATE_FINISHEDCOPY.
15251525
*/
1526-
if (rstate->state != SUBREL_STATE_SYNCDONE)
1527-
{
1528-
ReplicationOriginNameForTablesync(subid, relid, origenname,
1529-
sizeof(origenname));
1530-
replorigen_drop_by_name(origenname, true, false);
1531-
}
1526+
ReplicationOriginNameForTablesync(subid, relid, origenname,
1527+
sizeof(origenname));
1528+
replorigen_drop_by_name(origenname, true, false);
15321529
}
15331530

15341531
/* Clean up dependencies */

src/backend/replication/logical/tablesync.c

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
300300

301301
/*
302302
* UpdateSubscriptionRelState must be called within a transaction.
303-
* That transaction will be ended within the finish_sync_worker().
304303
*/
305304
if (!IsTransactionState())
306305
StartTransactionCommand();
@@ -310,30 +309,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
310309
MyLogicalRepWorker->relstate,
311310
MyLogicalRepWorker->relstate_lsn);
312311

313-
/*
314-
* Cleanup the tablesync origen tracking.
315-
*
316-
* Resetting the origen session removes the ownership of the slot.
317-
* This is needed to allow the origen to be dropped.
318-
*/
319-
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
320-
MyLogicalRepWorker->relid,
321-
origenname,
322-
sizeof(origenname));
323-
replorigen_session_reset();
324-
replorigen_session_origen = InvalidRepOriginId;
325-
replorigen_session_origen_lsn = InvalidXLogRecPtr;
326-
replorigen_session_origen_timestamp = 0;
327-
328-
/*
329-
* We expect that origen must be present. The concurrent operations
330-
* that remove origen like a refresh for the subscription take an
331-
* access exclusive lock on pg_subscription which prevent the previous
332-
* operation to update the rel state to SUBREL_STATE_SYNCDONE to
333-
* succeed.
334-
*/
335-
replorigen_drop_by_name(origenname, false, false);
336-
337312
/*
338313
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
339314
* the slot.
@@ -343,7 +318,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
343318
/*
344319
* Cleanup the tablesync slot.
345320
*
346-
* This has to be done after the data changes because otherwise if
321+
* This has to be done after updating the state because otherwise if
347322
* there is an error while doing the database operations we won't be
348323
* able to rollback dropped slot.
349324
*/
@@ -359,6 +334,49 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
359334
*/
360335
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
361336

337+
CommitTransactionCommand();
338+
pgstat_report_stat(false);
339+
340+
/*
341+
* Start a new transaction to clean up the tablesync origen tracking.
342+
* This transaction will be ended within the finish_sync_worker().
343+
* Now, even, if we fail to remove this here, the apply worker will
344+
* ensure to clean it up afterward.
345+
*
346+
* We need to do this after the table state is set to SYNCDONE.
347+
* Otherwise, if an error occurs while performing the database
348+
* operation, the worker will be restarted and the in-memory state of
349+
* replication progress (remote_lsn) won't be rolled-back which would
350+
* have been cleared before restart. So, the restarted worker will use
351+
* invalid replication progress state resulting in replay of
352+
* transactions that have already been applied.
353+
*/
354+
StartTransactionCommand();
355+
356+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
357+
MyLogicalRepWorker->relid,
358+
origenname,
359+
sizeof(origenname));
360+
361+
/*
362+
* Resetting the origen session removes the ownership of the slot.
363+
* This is needed to allow the origen to be dropped.
364+
*/
365+
replorigen_session_reset();
366+
replorigen_session_origen = InvalidRepOriginId;
367+
replorigen_session_origen_lsn = InvalidXLogRecPtr;
368+
replorigen_session_origen_timestamp = 0;
369+
370+
/*
371+
* Drop the tablesync's origen tracking if exists.
372+
*
373+
* There is a chance that the user is concurrently performing refresh
374+
* for the subscription where we remove the table state and its origen
375+
* or the apply worker would have removed this origen. So passing
376+
* missing_ok = true.
377+
*/
378+
replorigen_drop_by_name(origenname, true, false);
379+
362380
finish_sync_worker();
363381
}
364382
else
@@ -466,6 +484,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
466484
*/
467485
if (current_lsn >= rstate->lsn)
468486
{
487+
char origenname[NAMEDATALEN];
488+
469489
rstate->state = SUBREL_STATE_READY;
470490
rstate->lsn = current_lsn;
471491
if (!started_tx)
@@ -475,7 +495,24 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
475495
}
476496

477497
/*
478-
* Update the state to READY.
498+
* Remove the tablesync origen tracking if exists.
499+
*
500+
* There is a chance that the user is concurrently performing
501+
* refresh for the subscription where we remove the table
502+
* state and its origen or the tablesync worker would have
503+
* already removed this origen. We can't rely on tablesync
504+
* worker to remove the origen tracking as if there is any
505+
* error while dropping we won't restart it to drop the
506+
* origen. So passing missing_ok = true.
507+
*/
508+
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
509+
rstate->relid,
510+
origenname,
511+
sizeof(origenname));
512+
replorigen_drop_by_name(origenname, true, false);
513+
514+
/*
515+
* Update the state to READY only after the origen cleanup.
479516
*/
480517
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
481518
rstate->relid, rstate->state,

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/postgres/commit/88f488319bac051b874bcec87941217e25e0e126

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy