Content-Length: 387418 | pFad | http://github.com/postgrespro/postgres_cluster/commit/bc9dc445d6ffb8ee87f5e66fbc93615afaddae5a

BE Set proper restartLsn for all origens on start · postgrespro/postgres_cluster@bc9dc44 · GitHub
Skip to content

Commit bc9dc44

Browse files
committed
Set proper restartLsn for all origens on start
1 parent be41202 commit bc9dc44

File tree

1 file changed

+24
-14
lines changed

1 file changed

+24
-14
lines changed

pglogical_receiver.c

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,14 @@ pglogical_receiver_main(Datum main_arg)
217217
MtmReplicationMode mode;
218218

219219
ByteBuffer buf;
220-
RepOriginId origenId;
221-
char* origenName;
222220
/* Buffer for COPY data */
223221
char *copybuf = NULL;
224222
int spill_file = -1;
225223
StringInfoData spill_info;
226224
char *slotName;
227225
char* connString = psprintf("replication=database %s", Mtm->nodes[nodeId-1].con.connStr);
228226
static PortalData fakePortal;
227+
int i;
229228

230229
MtmBackgroundWorker = true;
231230

@@ -258,16 +257,27 @@ pglogical_receiver_main(Datum main_arg)
258257
ActivePortal->status = PORTAL_ACTIVE;
259258
ActivePortal->sourceText = "";
260259

261-
/* Create origenid */
262-
StartTransactionCommand();
263-
origenName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
264-
origenId = replorigen_by_name(origenName, true);
265-
if (origenId == InvalidRepOriginId) {
266-
origenId = replorigen_create(origenName);
260+
/*
261+
* Set proper restartLsn for all origens
262+
*/
263+
MtmLock(LW_EXCLUSIVE);
264+
for (i = 0; i < Mtm->nAllNodes; i++)
265+
{
266+
char *origenName;
267+
RepOriginId origenId;
268+
269+
StartTransactionCommand();
270+
origenName = psprintf(MULTIMASTER_SLOT_PATTERN, i + 1);
271+
origenId = replorigen_by_name(origenName, true);
272+
if (origenId == InvalidRepOriginId) {
273+
origenId = replorigen_create(origenName);
274+
}
275+
CommitTransactionCommand();
276+
if (Mtm->nodes[i].restartLSN == INVALID_LSN)
277+
Mtm->nodes[i].restartLSN = replorigen_get_progress(origenId, true);
278+
Mtm->nodes[i].origenId = origenId;
267279
}
268-
CommitTransactionCommand();
269-
Mtm->nodes[nodeId-1].origenId = origenId;
270-
Mtm->nodes[nodeId-1].restartLSN = INVALID_LSN;
280+
MtmUnlock();
271281

272282
/* This is main loop of logical replication.
273283
* In case of errors we will try to reestablish connection.
@@ -277,7 +287,7 @@ pglogical_receiver_main(Datum main_arg)
277287
{
278288
int count;
279289
ConnStatusType status;
280-
lsn_t origenStartPos = Mtm->nodes[nodeId-1].restartLSN;
290+
lsn_t origenStartPos;
281291
int timeline;
282292

283293
/*
@@ -308,7 +318,7 @@ pglogical_receiver_main(Datum main_arg)
308318
query = createPQExpBuffer();
309319

310320
/* Start logical replication at specified position */
311-
origenStartPos = replorigen_get_progress(origenId, false);
321+
origenStartPos = replorigen_get_progress(Mtm->nodes[nodeId-1].origenId, false);
312322
if (origenStartPos == INVALID_LSN || Mtm->nodes[nodeId-1].manualRecovery) {
313323
/*
314324
* We are just creating new replication slot.
@@ -337,7 +347,7 @@ pglogical_receiver_main(Datum main_arg)
337347
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, origenStartPos);
338348
Mtm->nodes[nodeId-1].restartLSN = origenStartPos;
339349
}
340-
MTM_LOG1("Restart logical receiver at position %llx with origen=%d from node %d", origenStartPos, origenId, nodeId);
350+
MTM_LOG1("Restart logical receiver at position %llx from node %d", origenStartPos, nodeId);
341351
}
342352

343353
MTM_LOG1("Start replication on slot %s from node %d at position %llx, mode %s, recovered lsn %llx",

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/postgres_cluster/commit/bc9dc445d6ffb8ee87f5e66fbc93615afaddae5a

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy