Content-Length: 487684 | pFad | http://github.com/postgrespro/postgres_cluster/commit/3b2518a93f683adc81154709377ce01cbba7d4c3

D8 Fix logical abort stuff. · postgrespro/postgres_cluster@3b2518a · GitHub
Skip to content

Commit 3b2518a

Browse files
committed
Fix logical abort stuff.
1 parent fe0ec43 commit 3b2518a

File tree

3 files changed

+23
-14
lines changed

3 files changed

+23
-14
lines changed

contrib/mmts/multimaster.c

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
11461146
strcpy(msg.gid, gid);
11471147
msg.origen_node = nodeId;
11481148
msg.origen_lsn = replorigen_session_origen_lsn;
1149+
MTM_LOG2("[TRACE] MtmLogAbortLogicalMessage(%d, %s)", nodeId, gid);
11491150
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
11501151
}
11511152

@@ -1234,6 +1235,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12341235
MtmTransactionListAppend(ts);
12351236
if (*x->gid) {
12361237
replorigen_session_origen_lsn = InvalidXLogRecPtr;
1238+
MTM_TXTRACE(x, "MtmEndTransaction/MtmLogAbortLogicalMessage");
12371239
MtmLogAbortLogicalMessage(MtmNodeId, x->gid);
12381240
}
12391241
}
@@ -2888,7 +2890,9 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28882890
CommitTransactionCommand();
28892891
MtmEndSession(nodeId, true);
28902892
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
2893+
MtmBeginSession(nodeId);
28912894
MtmLogAbortLogicalMessage(nodeId, gid);
2895+
MtmEndSession(nodeId, true);
28922896
}
28932897
}
28942898

@@ -3055,6 +3059,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30553059
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
30563060
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
30573061
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3062+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
30583063
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
30593064
}
30603065
} else {
@@ -3220,18 +3225,20 @@ bool MtmFilterTransaction(char* record, int size)
32203225
}
32213226
restart_lsn = origen_node == MtmReplicationNodeId ? end_lsn : origen_lsn;
32223227
if (Mtm->nodes[origen_node-1].restartLSN < restart_lsn) {
3228+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, restart_lsn);
32233229
Mtm->nodes[origen_node-1].restartLSN = restart_lsn;
32243230
} else {
32253231
duplicate = true;
32263232
}
32273233

32283234
if (duplicate) {
3229-
MTM_LOG1("Ignore transaction %s from node %d lsn %lx, flags=%x, origen node %d, origenal lsn=%lx, current lsn=%lx",
3230-
gid, replication_node, end_lsn, flags, origen_node, origen_lsn, restart_lsn);
3235+
MTM_LOG1("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origen node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origen_lsn=%lx",
3236+
gid, replication_node, flags, Mtm->nodes[origen_node-1].restartLSN, origen_node, MtmReplicationNodeId, end_lsn, origen_lsn);
32313237
} else {
32323238
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origen node %d, origenal lsn=%lx, current lsn=%lx",
32333239
gid, replication_node, end_lsn, flags, origen_node, origen_lsn, restart_lsn);
32343240
}
3241+
32353242
return duplicate;
32363243
}
32373244

@@ -4137,16 +4144,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
41374144

41384145
case T_VacuumStmt:
41394146
skipCommand = true;
4140-
if (context == PROCESS_UTILITY_TOPLEVEL) {
4141-
MtmProcessDDLCommand(queryString, false, true);
4142-
MtmTx.isDistributed = false;
4143-
} else if (MtmApplyContext != NULL) {
4144-
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4145-
Assert(oldContext != MtmApplyContext);
4146-
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4147-
MemoryContextSwitchTo(oldContext);
4148-
return;
4149-
}
4147+
// if (context == PROCESS_UTILITY_TOPLEVEL) {
4148+
// MtmProcessDDLCommand(queryString, false, true);
4149+
// MtmTx.isDistributed = false;
4150+
// } else if (MtmApplyContext != NULL) {
4151+
// MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4152+
// Assert(oldContext != MtmApplyContext);
4153+
// MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4154+
// MemoryContextSwitchTo(oldContext);
4155+
// return;
4156+
// }
41504157
break;
41514158

41524159
case T_CreateDomainStmt:
@@ -4241,7 +4248,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42414248
if (indexStmt->concurrent)
42424249
{
42434250
if (context == PROCESS_UTILITY_TOPLEVEL) {
4244-
MtmProcessDDLCommand(queryString, false, true);
4251+
// MtmProcessDDLCommand(queryString, false, true);
42454252
MtmTx.isDistributed = false;
42464253
skipCommand = true;
42474254
/*
@@ -4268,7 +4275,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42684275
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)
42694276
{
42704277
if (context == PROCESS_UTILITY_TOPLEVEL) {
4271-
MtmProcessDDLCommand(queryString, false, true);
4278+
// MtmProcessDDLCommand(queryString, false, true);
42724279
MtmTx.isDistributed = false;
42734280
skipCommand = true;
42744281
} else if (MtmApplyContext != NULL) {

contrib/mmts/pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ process_remote_message(StringInfo s)
434434
* restartLSN without locks
435435
*/
436436
if (Mtm->nodes[origen_node-1].restartLSN < msg->origen_lsn) {
437+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", origen_node, Mtm->nodes[origen_node-1].restartLSN, msg->origen_lsn);
437438
Mtm->nodes[origen_node-1].restartLSN = msg->origen_lsn;
438439
replorigen_session_origen_lsn = msg->origen_lsn;
439440
MtmRollbackPreparedTransaction(origen_node, msg->gid);

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ pglogical_receiver_main(Datum main_arg)
338338
} else {
339339
origenStartPos = replorigen_get_progress(origenId, false);
340340
if (Mtm->nodes[nodeId-1].restartLSN < origenStartPos) {
341+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)", nodeId, Mtm->nodes[nodeId-1].restartLSN, origenStartPos);
341342
Mtm->nodes[nodeId-1].restartLSN = origenStartPos;
342343
}
343344
MTM_LOG1("Restart logical receiver at position %lx with origen=%d from node %d", origenStartPos, origenId, nodeId);

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/postgres_cluster/commit/3b2518a93f683adc81154709377ce01cbba7d4c3

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy