@@ -930,7 +930,8 @@ static void LocalSetXLogInsertAllowed(void);
930
930
static void CreateEndOfRecoveryRecord (void );
931
931
static XLogRecPtr CreateOverwriteContrecordRecord (XLogRecPtr aborted_lsn );
932
932
static void CheckPointGuts (XLogRecPtr checkPointRedo , int flags );
933
- static void KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo );
933
+ static void KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinLSN ,
934
+ XLogSegNo * logSegNo );
934
935
static XLogRecPtr XLogGetReplicationSlotMinimumLSN (void );
935
936
936
937
static void AdvanceXLInsertBuffer (XLogRecPtr upto , bool opportunistic );
@@ -9122,6 +9123,7 @@ CreateCheckPoint(int flags)
9122
9123
XLogRecPtr last_important_lsn ;
9123
9124
VirtualTransactionId * vxids ;
9124
9125
int nvxids ;
9126
+ XLogRecPtr slotsMinReqLSN ;
9125
9127
9126
9128
/*
9127
9129
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -9335,6 +9337,15 @@ CreateCheckPoint(int flags)
9335
9337
*/
9336
9338
END_CRIT_SECTION ();
9337
9339
9340
+ /*
9341
+ * Get the current minimum LSN to be used later in the WAL segment
9342
+ * cleanup. We may clean up only WAL segments, which are not needed
9343
+ * according to synchronized LSNs of replication slots. The slot's LSN
9344
+ * might be advanced concurrently, so we call this before
9345
+ * CheckPointReplicationSlots() synchronizes replication slots.
9346
+ */
9347
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9348
+
9338
9349
/*
9339
9350
* In some cases there are groups of actions that must all occur on one
9340
9351
* side or the other of a checkpoint record. Before flushing the
@@ -9499,15 +9510,23 @@ CreateCheckPoint(int flags)
9499
9510
* prevent the disk holding the xlog from growing full.
9500
9511
*/
9501
9512
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
9502
- KeepLogSeg (recptr , & _logSegNo );
9513
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
9503
9514
if (InvalidateObsoleteReplicationSlots (_logSegNo ))
9504
9515
{
9516
+ /*
9517
+ * Recalculate the current minimum LSN to be used in the WAL segment
9518
+ * cleanup. Then, we must synchronize the replication slots again in
9519
+ * order to make this LSN safe to use.
9520
+ */
9521
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9522
+ CheckPointReplicationSlots ();
9523
+
9505
9524
/*
9506
9525
* Some slots have been invalidated; recalculate the old-segment
9507
9526
* horizon, starting again from RedoRecPtr.
9508
9527
*/
9509
9528
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
9510
- KeepLogSeg (recptr , & _logSegNo );
9529
+ KeepLogSeg (recptr , slotsMinReqLSN , & _logSegNo );
9511
9530
}
9512
9531
_logSegNo -- ;
9513
9532
RemoveOldXlogFiles (_logSegNo , RedoRecPtr , recptr );
@@ -9740,6 +9759,7 @@ CreateRestartPoint(int flags)
9740
9759
XLogRecPtr endptr ;
9741
9760
XLogSegNo _logSegNo ;
9742
9761
TimestampTz xtime ;
9762
+ XLogRecPtr slotsMinReqLSN ;
9743
9763
9744
9764
/* Get a local copy of the last safe checkpoint record. */
9745
9765
SpinLockAcquire (& XLogCtl -> info_lck );
@@ -9820,6 +9840,15 @@ CreateRestartPoint(int flags)
9820
9840
MemSet (& CheckpointStats , 0 , sizeof (CheckpointStats ));
9821
9841
CheckpointStats .ckpt_start_t = GetCurrentTimestamp ();
9822
9842
9843
+ /*
9844
+ * Get the current minimum LSN to be used later in the WAL segment
9845
+ * cleanup. We may clean up only WAL segments, which are not needed
9846
+ * according to synchronized LSNs of replication slots. The slot's LSN
9847
+ * might be advanced concurrently, so we call this before
9848
+ * CheckPointReplicationSlots() synchronizes replication slots.
9849
+ */
9850
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9851
+
9823
9852
if (log_checkpoints )
9824
9853
LogCheckpointStart (flags , true);
9825
9854
@@ -9908,15 +9937,23 @@ CreateRestartPoint(int flags)
9908
9937
receivePtr = GetWalRcvFlushRecPtr (NULL , NULL );
9909
9938
replayPtr = GetXLogReplayRecPtr (& replayTLI );
9910
9939
endptr = (receivePtr < replayPtr ) ? replayPtr : receivePtr ;
9911
- KeepLogSeg (endptr , & _logSegNo );
9940
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
9912
9941
if (InvalidateObsoleteReplicationSlots (_logSegNo ))
9913
9942
{
9943
+ /*
9944
+ * Recalculate the current minimum LSN to be used in the WAL segment
9945
+ * cleanup. Then, we must synchronize the replication slots again in
9946
+ * order to make this LSN safe to use.
9947
+ */
9948
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
9949
+ CheckPointReplicationSlots ();
9950
+
9914
9951
/*
9915
9952
* Some slots have been invalidated; recalculate the old-segment
9916
9953
* horizon, starting again from RedoRecPtr.
9917
9954
*/
9918
9955
XLByteToSeg (RedoRecPtr , _logSegNo , wal_segment_size );
9919
- KeepLogSeg (endptr , & _logSegNo );
9956
+ KeepLogSeg (endptr , slotsMinReqLSN , & _logSegNo );
9920
9957
}
9921
9958
_logSegNo -- ;
9922
9959
@@ -10019,6 +10056,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
10019
10056
XLogSegNo oldestSegMaxWalSize ; /* oldest segid kept by max_wal_size */
10020
10057
XLogSegNo oldestSlotSeg ; /* oldest segid kept by slot */
10021
10058
uint64 keepSegs ;
10059
+ XLogRecPtr slotsMinReqLSN ;
10022
10060
10023
10061
/*
10024
10062
* slot does not reserve WAL. Either deactivated, or has never been active
@@ -10032,8 +10070,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
10032
10070
* oldestSlotSeg to the current segment.
10033
10071
*/
10034
10072
currpos = GetXLogWriteRecPtr ();
10073
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN ();
10035
10074
XLByteToSeg (currpos , oldestSlotSeg , wal_segment_size );
10036
- KeepLogSeg (currpos , & oldestSlotSeg );
10075
+ KeepLogSeg (currpos , slotsMinReqLSN , & oldestSlotSeg );
10037
10076
10038
10077
/*
10039
10078
* Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -10094,7 +10133,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
10094
10133
* invalidation is optionally done here, instead.
10095
10134
*/
10096
10135
static void
10097
- KeepLogSeg (XLogRecPtr recptr , XLogSegNo * logSegNo )
10136
+ KeepLogSeg (XLogRecPtr recptr , XLogRecPtr slotsMinReqLSN , XLogSegNo * logSegNo )
10098
10137
{
10099
10138
XLogSegNo currSegNo ;
10100
10139
XLogSegNo segno ;
@@ -10107,7 +10146,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
10107
10146
* Calculate how many segments are kept by slots first, adjusting for
10108
10147
* max_slot_wal_keep_size.
10109
10148
*/
10110
- keep = XLogGetReplicationSlotMinimumLSN () ;
10149
+ keep = slotsMinReqLSN ;
10111
10150
if (keep != InvalidXLogRecPtr && keep < recptr )
10112
10151
{
10113
10152
XLByteToSeg (keep , segno , wal_segment_size );
0 commit comments