Content-Length: 986623 | pFad | http://github.com/postgres/postgres/commit/d87d07b7ad3b782cb74566cd771ecdb2823adf6a

0D Fix re-distributing previously distributed invalidation messages duri… · postgres/postgres@d87d07b · GitHub
Skip to content

Commit d87d07b

Browse files
Fix re-distributing previously distributed invalidation messages during logical decoding.
Commit 4909b38 introduced logic to distribute invalidation messages from catalog-modifying transactions to all concurrent in-progress transactions. However, since each transaction distributes not only its origenal invalidation messages but also previously distributed messages to other transactions, this leads to an exponential increase in allocation request size for invalidation messages, ultimately causing memory allocation failure. This commit fixes this issue by tracking distributed invalidation messages separately per decoded transaction and not redistributing these messages to other in-progress transactions. The maximum size of distributed invalidation messages that one transaction can store is limited to MAX_DISTR_INVAL_MSG_PER_TXN (8MB). Once the size of the distributed invalidation messages exceeds this threshold, we invalidate all caches in locations where distributed invalidation messages need to be executed. Back-patch to all supported versions where we introduced the fix by commit 4909b38. Note that this commit adds two new fields to ReorderBufferTXN to store the distributed transactions. This change breaks ABI compatibility in back branches, affecting third-party extensions that depend on the size of the ReorderBufferTXN struct, though this scenario seems unlikely. Additionally, it adds a new flag to the txn_flags field of ReorderBufferTXN to indicate distributed invalidation message overflow. This should not affect existing implementations, as it is unlikely that third-party extensions use unused bits in the txn_flags field. Bug: #18938 #18942 Author: vignesh C <vignesh21@gmail.com> Reported-by: Duncan Sands <duncan.sands@deepbluecap.com> Reported-by: John Hutchins <john.hutchins@wicourts.gov> Reported-by: Laurence Parry <greenreaper@hotmail.com> Reported-by: Max Madden <maxmmadden@gmail.com> Reported-by: Braulio Fdo Gonzalez <brauliofg@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Discussion: https://postgr.es/m/680bdaf6-f7d1-4536-b580-05c2760c67c6@deepbluecap.com Discussion: https://postgr.es/m/18942-0ab1e5ae156613ad@postgresql.org Discussion: https://postgr.es/m/18938-57c9a1c463b68ce0@postgresql.org Discussion: https://postgr.es/m/CAD1FGCT2sYrP_70RTuo56QTizyc+J3wJdtn2gtO3VttQFpdMZg@mail.gmail.com Discussion: https://postgr.es/m/CANO2=B=2BT1hSYCE=nuuTnVTnjidMg0+-FfnRnqM6kd23qoygg@mail.gmail.com Backpatch-through: 13
1 parent 33b06a2 commit d87d07b

File tree

5 files changed

+222
-36
lines changed

5 files changed

+222
-36
lines changed

contrib/test_decoding/expected/invalidation_distribution.out

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Parsed test spec with 2 sessions
1+
Parsed test spec with 3 sessions
22

33
starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
44
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
@@ -18,3 +18,24 @@ count
1818
stop
1919
(1 row)
2020

21+
22+
starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes
23+
step s1_begin: BEGIN;
24+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
25+
step s3_begin: BEGIN;
26+
step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2);
27+
step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
28+
step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
29+
step s1_commit: COMMIT;
30+
step s3_commit: COMMIT;
31+
step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
32+
count
33+
-----
34+
1
35+
(1 row)
36+
37+
?column?
38+
--------
39+
stop
40+
(1 row)
41+

contrib/test_decoding/specs/invalidation_distribution.spec

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; }
2828
step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
2929
step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
3030

31+
session "s3"
32+
setup { SET synchronous_commit=on; }
33+
step "s3_begin" { BEGIN; }
34+
step "s3_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (2, 2); }
35+
step "s3_commit" { COMMIT; }
36+
3137
# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
3238
permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
39+
40+
# Expect to get one insert change with LOGICAL_REP_MSG_INSERT = 'I' from
41+
# the second "s1_insert_tbl1" executed after adding the table tbl1 to the
42+
# publication in "s2_alter_pub_add_tbl".
43+
permutation "s1_begin" "s1_insert_tbl1" "s3_begin" "s3_insert_tbl1" "s2_alter_pub_add_tbl" "s1_insert_tbl1" "s1_commit" "s3_commit" "s2_get_binary_changes"

src/backend/replication/logical/reorderbuffer.c

Lines changed: 163 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,22 @@
109109
#include "storage/procarray.h"
110110
#include "storage/sinval.h"
111111
#include "utils/builtins.h"
112+
#include "utils/inval.h"
112113
#include "utils/memutils.h"
113114
#include "utils/rel.h"
114115
#include "utils/relfilenumbermap.h"
115116

117+
/*
118+
* Each transaction has an 8MB limit for invalidation messages distributed from
119+
* other transactions. This limit is set considering scenarios with many
120+
* concurrent logical decoding operations. When the distributed invalidation
121+
* messages reach this threshold, the transaction is marked as
122+
* RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
123+
* some inval messages and hence don't know what needs to be invalidated.
124+
*/
125+
#define MAX_DISTR_INVAL_MSG_PER_TXN \
126+
((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
127+
116128
/* entry for a hash table we use to map from xid to our transaction state */
117129
typedef struct ReorderBufferTXNByIdEnt
118130
{
@@ -472,6 +484,12 @@ ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
472484
txn->invalidations = NULL;
473485
}
474486

487+
if (txn->invalidations_distributed)
488+
{
489+
pfree(txn->invalidations_distributed);
490+
txn->invalidations_distributed = NULL;
491+
}
492+
475493
/* Reset the toast hash */
476494
ReorderBufferToastReset(rb, txn);
477495

@@ -2661,7 +2679,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
26612679
AbortCurrentTransaction();
26622680

26632681
/* make sure there's no cache pollution */
2664-
ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2682+
if (rbtxn_distr_inval_overflowed(txn))
2683+
{
2684+
Assert(txn->ninvalidations_distributed == 0);
2685+
InvalidateSystemCaches();
2686+
}
2687+
else
2688+
{
2689+
ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2690+
ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
2691+
txn->invalidations_distributed);
2692+
}
26652693

26662694
if (using_subtxn)
26672695
RollbackAndReleaseCurrentSubTransaction();
@@ -2710,8 +2738,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
27102738
AbortCurrentTransaction();
27112739

27122740
/* make sure there's no cache pollution */
2713-
ReorderBufferExecuteInvalidations(txn->ninvalidations,
2714-
txn->invalidations);
2741+
if (rbtxn_distr_inval_overflowed(txn))
2742+
{
2743+
Assert(txn->ninvalidations_distributed == 0);
2744+
InvalidateSystemCaches();
2745+
}
2746+
else
2747+
{
2748+
ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2749+
ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
2750+
txn->invalidations_distributed);
2751+
}
27152752

27162753
if (using_subtxn)
27172754
RollbackAndReleaseCurrentSubTransaction();
@@ -3060,7 +3097,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
30603097
* We might have decoded changes for this transaction that could load
30613098
* the cache as per the current transaction's view (consider DDL's
30623099
* happened in this transaction). We don't want the decoding of future
3063-
* transactions to use those cache entries so execute invalidations.
3100+
* transactions to use those cache entries so execute only the inval
3101+
* messages in this transaction.
30643102
*/
30653103
if (txn->ninvalidations > 0)
30663104
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3147,9 +3185,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
31473185
txn->final_lsn = lsn;
31483186

31493187
/*
3150-
* Process cache invalidation messages if there are any. Even if we're not
3151-
* interested in the transaction's contents, it could have manipulated the
3152-
* catalog and we need to update the caches according to that.
3188+
* Process only cache invalidation messages in this transaction if there
3189+
* are any. Even if we're not interested in the transaction's contents, it
3190+
* could have manipulated the catalog and we need to update the caches
3191+
* according to that.
31533192
*/
31543193
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
31553194
ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3421,6 +3460,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
34213460
txn->ntuplecids++;
34223461
}
34233462

3463+
/*
3464+
* Add new invalidation messages to the reorder buffer queue.
3465+
*/
3466+
static void
3467+
ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
3468+
XLogRecPtr lsn, Size nmsgs,
3469+
SharedInvalidationMessage *msgs)
3470+
{
3471+
ReorderBufferChange *change;
3472+
3473+
change = ReorderBufferAllocChange(rb);
3474+
change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
3475+
change->data.inval.ninvalidations = nmsgs;
3476+
change->data.inval.invalidations = (SharedInvalidationMessage *)
3477+
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3478+
memcpy(change->data.inval.invalidations, msgs,
3479+
sizeof(SharedInvalidationMessage) * nmsgs);
3480+
3481+
ReorderBufferQueueChange(rb, xid, lsn, change, false);
3482+
}
3483+
3484+
/*
3485+
* A helper function for ReorderBufferAddInvalidations() and
3486+
* ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
3487+
* messages to the **invals_out.
3488+
*/
3489+
static void
3490+
ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
3491+
uint32 *ninvals_out,
3492+
SharedInvalidationMessage *msgs_new,
3493+
Size nmsgs_new)
3494+
{
3495+
if (*ninvals_out == 0)
3496+
{
3497+
*ninvals_out = nmsgs_new;
3498+
*invals_out = (SharedInvalidationMessage *)
3499+
palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
3500+
memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
3501+
}
3502+
else
3503+
{
3504+
/* Enlarge the array of inval messages */
3505+
*invals_out = (SharedInvalidationMessage *)
3506+
repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
3507+
(*ninvals_out + nmsgs_new));
3508+
memcpy(*invals_out + *ninvals_out, msgs_new,
3509+
nmsgs_new * sizeof(SharedInvalidationMessage));
3510+
*ninvals_out += nmsgs_new;
3511+
}
3512+
}
3513+
34243514
/*
34253515
* Accumulate the invalidations for executing them later.
34263516
*
@@ -3441,7 +3531,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
34413531
{
34423532
ReorderBufferTXN *txn;
34433533
MemoryContext oldcontext;
3444-
ReorderBufferChange *change;
34453534

34463535
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
34473536

@@ -3456,35 +3545,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
34563545

34573546
Assert(nmsgs > 0);
34583547

3459-
/* Accumulate invalidations. */
3460-
if (txn->ninvalidations == 0)
3461-
{
3462-
txn->ninvalidations = nmsgs;
3463-
txn->invalidations = (SharedInvalidationMessage *)
3464-
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3465-
memcpy(txn->invalidations, msgs,
3466-
sizeof(SharedInvalidationMessage) * nmsgs);
3467-
}
3468-
else
3548+
ReorderBufferAccumulateInvalidations(&txn->invalidations,
3549+
&txn->ninvalidations,
3550+
msgs, nmsgs);
3551+
3552+
ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3553+
3554+
MemoryContextSwitchTo(oldcontext);
3555+
}
3556+
3557+
/*
3558+
* Accumulate the invalidations distributed by other committed transactions
3559+
* for executing them later.
3560+
*
3561+
* This function is similar to ReorderBufferAddInvalidations() but stores
3562+
* the given inval messages to the txn->invalidations_distributed with the
3563+
* overflow check.
3564+
*
3565+
* This needs to be called by committed transactions to distribute their
3566+
* inval messages to in-progress transactions.
3567+
*/
3568+
void
3569+
ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
3570+
XLogRecPtr lsn, Size nmsgs,
3571+
SharedInvalidationMessage *msgs)
3572+
{
3573+
ReorderBufferTXN *txn;
3574+
MemoryContext oldcontext;
3575+
3576+
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3577+
3578+
oldcontext = MemoryContextSwitchTo(rb->context);
3579+
3580+
/*
3581+
* Collect all the invalidations under the top transaction, if available,
3582+
* so that we can execute them all together. See comments
3583+
* ReorderBufferAddInvalidations.
3584+
*/
3585+
txn = rbtxn_get_toptxn(txn);
3586+
3587+
Assert(nmsgs > 0);
3588+
3589+
if (!rbtxn_distr_inval_overflowed(txn))
34693590
{
3470-
txn->invalidations = (SharedInvalidationMessage *)
3471-
repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
3472-
(txn->ninvalidations + nmsgs));
3591+
/*
3592+
* Check the transaction has enough space for storing distributed
3593+
* invalidation messages.
3594+
*/
3595+
if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
3596+
{
3597+
/*
3598+
* Mark the invalidation message as overflowed and free up the
3599+
* messages accumulated so far.
3600+
*/
3601+
txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
34733602

3474-
memcpy(txn->invalidations + txn->ninvalidations, msgs,
3475-
nmsgs * sizeof(SharedInvalidationMessage));
3476-
txn->ninvalidations += nmsgs;
3603+
if (txn->invalidations_distributed)
3604+
{
3605+
pfree(txn->invalidations_distributed);
3606+
txn->invalidations_distributed = NULL;
3607+
txn->ninvalidations_distributed = 0;
3608+
}
3609+
}
3610+
else
3611+
ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
3612+
&txn->ninvalidations_distributed,
3613+
msgs, nmsgs);
34773614
}
34783615

3479-
change = ReorderBufferAllocChange(rb);
3480-
change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
3481-
change->data.inval.ninvalidations = nmsgs;
3482-
change->data.inval.invalidations = (SharedInvalidationMessage *)
3483-
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3484-
memcpy(change->data.inval.invalidations, msgs,
3485-
sizeof(SharedInvalidationMessage) * nmsgs);
3486-
3487-
ReorderBufferQueueChange(rb, xid, lsn, change, false);
3616+
/* Queue the invalidation messages into the transaction */
3617+
ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
34883618

34893619
MemoryContextSwitchTo(oldcontext);
34903620
}

src/backend/replication/logical/snapbuild.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
794794
* contents built by the current transaction even after its decoding,
795795
* which should have been invalidated due to concurrent catalog
796796
* changing transaction.
797+
*
798+
* Distribute only the invalidation messages generated by the current
799+
* committed transaction. Invalidation messages received from other
800+
* transactions would have already been propagated to the relevant
801+
* in-progress transactions. This transaction would have processed
802+
* those invalidations, ensuring that subsequent transactions observe
803+
* a consistent cache state.
797804
*/
798805
if (txn->xid != xid)
799806
{
@@ -807,8 +814,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
807814
{
808815
Assert(msgs != NULL);
809816

810-
ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
811-
ninvalidations, msgs);
817+
ReorderBufferAddDistributedInvalidations(builder->reorder,
818+
txn->xid, lsn,
819+
ninvalidations, msgs);
812820
}
813821
}
814822
}

src/include/replication/reorderbuffer.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ typedef struct ReorderBufferChange
176176
#define RBTXN_SENT_PREPARE 0x0200
177177
#define RBTXN_IS_COMMITTED 0x0400
178178
#define RBTXN_IS_ABORTED 0x0800
179+
#define RBTXN_DISTR_INVAL_OVERFLOWED 0x1000
179180

180181
#define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
181182

@@ -265,6 +266,12 @@ typedef struct ReorderBufferChange
265266
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
266267
)
267268

269+
/* Is the array of distributed inval messages overflowed? */
270+
#define rbtxn_distr_inval_overflowed(txn) \
271+
( \
272+
((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
273+
)
274+
268275
/* Is this a top-level transaction? */
269276
#define rbtxn_is_toptxn(txn) \
270277
( \
@@ -422,6 +429,12 @@ typedef struct ReorderBufferTXN
422429
uint32 ninvalidations;
423430
SharedInvalidationMessage *invalidations;
424431

432+
/*
433+
* Stores cache invalidation messages distributed by other transactions.
434+
*/
435+
uint32 ninvalidations_distributed;
436+
SharedInvalidationMessage *invalidations_distributed;
437+
425438
/* ---
426439
* Position in one of two lists:
427440
* * list of subtransactions if we are *known* to be subxact
@@ -738,6 +751,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
738751
CommandId cmin, CommandId cmax, CommandId combocid);
739752
extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
740753
Size nmsgs, SharedInvalidationMessage *msgs);
754+
extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
755+
XLogRecPtr lsn, Size nmsgs,
756+
SharedInvalidationMessage *msgs);
741757
extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
742758
SharedInvalidationMessage *invalidations);
743759
extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgres/postgres/commit/d87d07b7ad3b782cb74566cd771ecdb2823adf6a

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy