@@ -646,7 +646,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
646
646
XLogRecPtr missingContrecPtr ,
647
647
TimeLineID newTLI );
648
648
static void CheckPointGuts (XLogRecPtr checkPointRedo , int flags );
649
- static void KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo );
649
+ static void KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinLSN ,
650
+ XLogSegNo * logSegNo );
650
651
static XLogRecPtr XLogGetReplicationSlotMinimumLSN (void );
651
652
652
653
static void AdvanceXLInsertBuffer (XLogRecPtr upto , TimeLineID tli ,
@@ -6333,6 +6334,7 @@ CreateCheckPoint(int flags)
6333
6334
VirtualTransactionId * vxids ;
6334
6335
int nvxids ;
6335
6336
int oldXLogAllowed = 0 ;
6337
+ XLogRecPtr slotsMinReqLSN ;
6336
6338
6337
6339
/*
6338
6340
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -6534,6 +6536,15 @@ CreateCheckPoint(int flags)
6534
6536
*/
6535
6537
END_CRIT_SECTION ();
6536
6538
6539
+ /*
6540
+ * Get the current minimum LSN to be used later in the WAL segment
6541
+ * cleanup. We may clean up only WAL segments, which are not needed
6542
+ * according to synchronized LSNs of replication slots. The slot's LSN
6543
+ * might be advanced concurrently, so we call this before
6544
+ * CheckPointReplicationSlots() synchronizes replication slots.
6545
+ */
6546
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
6547
+
6537
6548
/*
6538
6549
* In some cases there are groups of actions that must all occur on one
6539
6550
* side or the other of a checkpoint record. Before flushing the
@@ -6699,15 +6710,23 @@ CreateCheckPoint(int flags)
6699
6710
* prevent the disk holding the xlog from growing full.
6700
6711
*/
6701
6712
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
6702
- KeepLogSeg (recptr , & _logSegNo );
6713
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
6703
6714
if (InvalidateObsoleteReplicationSlots (_logSegNo ))
6704
6715
{
6716
+ /*
6717
+ * Recalculate the current minimum LSN to be used in the WAL segment
6718
+ * cleanup. Then, we must synchronize the replication slots again in
6719
+ * order to make this LSN safe to use.
6720
+ */
6721
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
6722
+ CheckPointReplicationSlots ();
6723
+
6705
6724
/*
6706
6725
* Some slots have been invalidated; recalculate the old-segment
6707
6726
* horizon, starting again from RedoRecPtr.
6708
6727
*/
6709
6728
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
6710
- KeepLogSeg (recptr , & _logSegNo );
6729
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
6711
6730
}
6712
6731
_logSegNo -- ;
6713
6732
RemoveOldXlogFiles (_logSegNo , RedoRecPtr , recptr ,
@@ -6979,6 +6998,7 @@ CreateRestartPoint(int flags)
6979
6998
XLogRecPtr endptr ;
6980
6999
XLogSegNo _logSegNo ;
6981
7000
TimestampTz xtime ;
7001
+ XLogRecPtr slotsMinReqLSN ;
6982
7002
6983
7003
/* Concurrent checkpoint/restartpoint cannot happen */
6984
7004
Assert (!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER );
@@ -7061,6 +7081,15 @@ CreateRestartPoint(int flags)
7061
7081
MemSet (& CheckpointStats , 0 , sizeof (CheckpointStats ));
7062
7082
CheckpointStats .ckpt_start_t = GetCurrentTimestamp ();
7063
7083
7084
+ /*
7085
+ * Get the current minimum LSN to be used later in the WAL segment
7086
+ * cleanup. We may clean up only WAL segments, which are not needed
7087
+ * according to synchronized LSNs of replication slots. The slot's LSN
7088
+ * might be advanced concurrently, so we call this before
7089
+ * CheckPointReplicationSlots() synchronizes replication slots.
7090
+ */
7091
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7092
+
7064
7093
if (log_checkpoints )
7065
7094
LogCheckpointStart (flags , true);
7066
7095
@@ -7143,15 +7172,23 @@ CreateRestartPoint(int flags)
7143
7172
receivePtr = GetWalRcvFlushRecPtr (NULL , NULL );
7144
7173
replayPtr = GetXLogReplayRecPtr (& replayTLI );
7145
7174
endptr = (receivePtr < replayPtr ) ? replayPtr : receivePtr ;
7146
- KeepLogSeg (endptr , & _logSegNo );
7175
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
7147
7176
if (InvalidateObsoleteReplicationSlots (_logSegNo ))
7148
7177
{
7178
+ /*
7179
+ * Recalculate the current minimum LSN to be used in the WAL segment
7180
+ * cleanup. Then, we must synchronize the replication slots again in
7181
+ * order to make this LSN safe to use.
7182
+ */
7183
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7184
+ CheckPointReplicationSlots ();
7185
+
7149
7186
/*
7150
7187
* Some slots have been invalidated; recalculate the old-segment
7151
7188
* horizon, starting again from RedoRecPtr.
7152
7189
*/
7153
7190
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
7154
- KeepLogSeg (endptr , & _logSegNo );
7191
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
7155
7192
}
7156
7193
_logSegNo -- ;
7157
7194
@@ -7245,6 +7282,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
7245
7282
XLogSegNo oldestSegMaxWalSize ; /* oldest segid kept by max_wal_size */
7246
7283
XLogSegNo oldestSlotSeg ; /* oldest segid kept by slot */
7247
7284
uint64 keepSegs ;
7285
+ XLogRecPtr slotsMinReqLSN ;
7248
7286
7249
7287
/*
7250
7288
* slot does not reserve WAL. Either deactivated, or has never been active
@@ -7258,8 +7296,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
7258
7296
* oldestSlotSeg to the current segment.
7259
7297
*/
7260
7298
currpos = GetXLogWriteRecPtr ();
7299
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
7261
7300
XLByteToSeg (currpos , oldestSlotSeg , wal_segment_size );
7262
- KeepLogSeg (currpos , & oldestSlotSeg );
7301
+ KeepLogSeg (currpos , slotsMinReqLSN , & oldestSlotSeg );
7263
7302
7264
7303
/*
7265
7304
* Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -7320,7 +7359,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
7320
7359
* invalidation is optionally done here, instead.
7321
7360
*/
7322
7361
static void
7323
- KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo )
7362
+ KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinReqLSN , XLogSegNo * logSegNo )
7324
7363
{
7325
7364
XLogSegNo currSegNo ;
7326
7365
XLogSegNo segno ;
@@ -7333,7 +7372,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
7333
7372
* Calculate how many segments are kept by slots first, adjusting for
7334
7373
* max_slot_wal_keep_size.
7335
7374
*/
7336
- keep = XLogGetReplicationSlotMinimumLSN () ;
7375
+ keep = slotsMinReqLSN ;
7337
7376
if (keep != InvalidXLogRecPtr && keep < recptr )
7338
7377
{
7339
7378
XLByteToSeg (keep , segno , wal_segment_size );
0 commit comments