Content-Length: 712001 | pFad | http://github.com/postgrespro/postgres_cluster/commit/2413fd4fdce59edaa9464ec60c86ea44d4252cdb

81 Read 2PC state file from XLOG, WIP · postgrespro/postgres_cluster@2413fd4 · GitHub
Skip to content

Commit 2413fd4

Browse files
committed
Read 2PC state file from XLOG, WIP
1 parent 4a12787 commit 2413fd4

File tree

1 file changed

+214
-10
lines changed

1 file changed

+214
-10
lines changed

src/backend/access/transam/twophase.c

Lines changed: 214 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
#include "access/xlog.h"
5252
#include "access/xloginsert.h"
5353
#include "access/xlogutils.h"
54+
#include "access/xlogreader.h"
55+
#include "access/xlog_internal.h"
5456
#include "catalog/pg_type.h"
5557
#include "catalog/storage.h"
5658
#include "funcapi.h"
@@ -117,7 +119,8 @@ typedef struct GlobalTransactionData
117119
int pgprocno; /* ID of associated dummy PGPROC */
118120
BackendId dummyBackendId; /* similar to backend id for backends */
119121
TimestampTz prepared_at; /* time of preparation */
120-
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
122+
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record end */
123+
XLogRecPtr prepare_xlogptr; /* XLOG offset of prepare record start */
121124
Oid owner; /* ID of user that executed the xact */
122125
BackendId locking_backend; /* backend currently working on the xact */
123126
bool valid; /* TRUE if PGPROC entry is in proc array */
@@ -170,15 +173,8 @@ static void RemoveGXact(GlobalTransaction gxact);
170173

171174
static char twophase_buf[10*1024];
172175
static int twophase_pos = 0;
173-
size_t
174-
bogus_write(int fd, char *buf, size_t nbytes)
175-
{
176-
memcpy(twophase_buf + twophase_pos, buf, nbytes);
177-
twophase_pos += nbytes;
178-
return nbytes;
179-
}
180-
181-
176+
size_t bogus_write(int fd, char *buf, size_t nbytes);
177+
// LWLock *xlogreclock;
182178

183179
/*
184180
* Initialization of shared memory
@@ -413,6 +409,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
413409
gxact->prepared_at = prepared_at;
414410
/* initialize LSN to 0 (start of WAL) */
415411
gxact->prepare_lsn = 0;
412+
gxact->prepare_xlogptr = 0;
416413
gxact->owner = owner;
417414
gxact->locking_backend = MyBackendId;
418415
gxact->valid = false;
@@ -1144,11 +1141,25 @@ EndPrepare(GlobalTransaction gxact)
11441141
MyPgXact->delayChkpt = true;
11451142

11461143
XLogBeginInsert();
1144+
11471145
for (record = records.head; record != NULL; record = record->next)
11481146
XLogRegisterData(record->data, record->len);
1147+
1148+
// LWLockAcquire(xlogreclock, LW_EXCLUSIVE);
1149+
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1150+
gxact->prepare_xlogptr = GetXLogInsertRecPtr();
11491151
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1152+
LWLockRelease(TwoPhaseStateLock);
1153+
// LWLockRelease(xlogreclock);
1154+
1155+
11501156
XLogFlush(gxact->prepare_lsn);
11511157

1158+
1159+
fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n", gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1160+
fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n", gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1161+
1162+
11521163
/* If we crash now, we have prepared: WAL replay will fix things */
11531164

11541165
/* write correct CRC and close file */
@@ -2228,3 +2239,196 @@ RecordTransactionAbortPrepared(TransactionId xid,
22282239
*/
22292240
SyncRepWaitForLSN(recptr);
22302241
}
2242+
2243+
2244+
2245+
2246+
2247+
2248+
2249+
2250+
2251+
/**********************************************************************************/
2252+
2253+
2254+
static int xlogreadfd = -1;
2255+
static XLogSegNo xlogreadsegno = -1;
2256+
static char xlogfpath[MAXPGPATH];
2257+
2258+
typedef struct XLogPageReadPrivate
2259+
{
2260+
const char *datadir;
2261+
TimeLineID tli;
2262+
} XLogPageReadPrivate;
2263+
2264+
size_t
2265+
bogus_write(int fd, char *buf, size_t nbytes)
2266+
{
2267+
memcpy(twophase_buf + twophase_pos, buf, nbytes);
2268+
twophase_pos += nbytes;
2269+
return nbytes;
2270+
}
2271+
2272+
2273+
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
2274+
XLogRecPtr targetPagePtr,
2275+
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
2276+
TimeLineID *pageTLI);
2277+
2278+
2279+
/* XLogreader callback function, to read a WAL page */
2280+
static int
2281+
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
2282+
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
2283+
TimeLineID *pageTLI)
2284+
{
2285+
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
2286+
uint32 targetPageOff;
2287+
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
2288+
2289+
XLByteToSeg(targetPagePtr, targetSegNo);
2290+
targetPageOff = targetPagePtr % XLogSegSize;
2291+
2292+
/*
2293+
* See if we need to switch to a new segment because the requested record
2294+
* is not in the currently open one.
2295+
*/
2296+
if (xlogreadfd >= 0 && !XLByteInSeg(targetPagePtr, xlogreadsegno))
2297+
{
2298+
close(xlogreadfd);
2299+
xlogreadfd = -1;
2300+
}
2301+
2302+
XLByteToSeg(targetPagePtr, xlogreadsegno);
2303+
2304+
if (xlogreadfd < 0)
2305+
{
2306+
char xlogfname[MAXFNAMELEN];
2307+
2308+
XLogFileName(xlogfname, private->tli, xlogreadsegno);
2309+
2310+
snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
2311+
2312+
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
2313+
2314+
if (xlogreadfd < 0)
2315+
{
2316+
printf(_("could not open file \"%s\": %s\n"), xlogfpath,
2317+
strerror(errno));
2318+
return -1;
2319+
}
2320+
}
2321+
2322+
/*
2323+
* At this point, we have the right segment open.
2324+
*/
2325+
Assert(xlogreadfd != -1);
2326+
2327+
/* Read the requested page */
2328+
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
2329+
{
2330+
printf(_("could not seek in file \"%s\": %s\n"), xlogfpath,
2331+
strerror(errno));
2332+
return -1;
2333+
}
2334+
2335+
if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2336+
{
2337+
printf(_("could not read from file \"%s\": %s\n"), xlogfpath,
2338+
strerror(errno));
2339+
return -1;
2340+
}
2341+
2342+
Assert(targetSegNo == xlogreadsegno);
2343+
2344+
*pageTLI = private->tli;
2345+
return XLOG_BLCKSZ;
2346+
}
2347+
2348+
// XLogRecPtr
2349+
// readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli);
2350+
2351+
// XLogRecPtr
2352+
// readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli)
2353+
// {
2354+
// XLogRecord *record;
2355+
// XLogReaderState *xlogreader;
2356+
// char *errormsg;
2357+
// XLogPageReadPrivate private;
2358+
// XLogRecPtr endptr;
2359+
2360+
// private.datadir = datadir;
2361+
// private.tli = tli;
2362+
// xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2363+
// if (xlogreader == NULL)
2364+
// pg_fatal("out of memory\n");
2365+
2366+
// record = XLogReadRecord(xlogreader, ptr, &errormsg);
2367+
// if (record == NULL)
2368+
// {
2369+
// if (errormsg)
2370+
// pg_fatal("could not read WAL record at %X/%X: %s\n",
2371+
// (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2372+
// else
2373+
// pg_fatal("could not read WAL record at %X/%X\n",
2374+
// (uint32) (ptr >> 32), (uint32) (ptr));
2375+
// }
2376+
// endptr = xlogreader->EndRecPtr;
2377+
2378+
// XLogReaderFree(xlogreader);
2379+
// if (xlogreadfd != -1)
2380+
// {
2381+
// close(xlogreadfd);
2382+
// xlogreadfd = -1;
2383+
// }
2384+
2385+
// return endptr;
2386+
// }
2387+
2388+
2389+
// static char *
2390+
// XlogReadTwoPhaseData(XLogRecPtr lsn, bool give_warnings, TimeLineID tli)
2391+
// {
2392+
// XLogRecord *record;
2393+
// XLogReaderState *xlogreader;
2394+
// XLogPageReadPrivate private;
2395+
2396+
// private.datadir = datadir;
2397+
// private.tli = tli;
2398+
// xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2399+
// if (xlogreader == NULL)
2400+
// pg_fatal("out of memory\n");
2401+
2402+
// record = XLogReadRecord(xlogreader, ptr, &errormsg);
2403+
// if (record == NULL)
2404+
// {
2405+
// if (errormsg)
2406+
// pg_fatal("could not read WAL record at %X/%X: %s\n",
2407+
// (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2408+
// else
2409+
// pg_fatal("could not read WAL record at %X/%X\n",
2410+
// (uint32) (ptr >> 32), (uint32) (ptr));
2411+
// }
2412+
// endptr = xlogreader->EndRecPtr;
2413+
2414+
// XLogReaderFree(xlogreader);
2415+
// if (xlogreadfd != -1)
2416+
// {
2417+
// close(xlogreadfd);
2418+
// xlogreadfd = -1;
2419+
// }
2420+
2421+
// return XLogRecGetData(record)
2422+
// }
2423+
2424+
2425+
2426+
2427+
2428+
2429+
2430+
2431+
2432+
2433+
2434+

0 commit comments

Comments
 (0)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/postgrespro/postgres_cluster/commit/2413fd4fdce59edaa9464ec60c86ea44d4252cdb

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy