|
51 | 51 | #include "access/xlog.h"
|
52 | 52 | #include "access/xloginsert.h"
|
53 | 53 | #include "access/xlogutils.h"
|
| 54 | +#include "access/xlogreader.h" |
| 55 | +#include "access/xlog_internal.h" |
54 | 56 | #include "catalog/pg_type.h"
|
55 | 57 | #include "catalog/storage.h"
|
56 | 58 | #include "funcapi.h"
|
@@ -117,7 +119,8 @@ typedef struct GlobalTransactionData
|
117 | 119 | int pgprocno; /* ID of associated dummy PGPROC */
|
118 | 120 | BackendId dummyBackendId; /* similar to backend id for backends */
|
119 | 121 | TimestampTz prepared_at; /* time of preparation */
|
120 |
| - XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */ |
| 122 | + XLogRecPtr prepare_lsn; /* XLOG offset of prepare record end */ |
| 123 | + XLogRecPtr prepare_xlogptr; /* XLOG offset of prepare record start */ |
121 | 124 | Oid owner; /* ID of user that executed the xact */
|
122 | 125 | BackendId locking_backend; /* backend currently working on the xact */
|
123 | 126 | bool valid; /* TRUE if PGPROC entry is in proc array */
|
@@ -170,15 +173,8 @@ static void RemoveGXact(GlobalTransaction gxact);
|
170 | 173 |
|
171 | 174 | static char twophase_buf[10*1024];
|
172 | 175 | static int twophase_pos = 0;
|
173 |
| -size_t |
174 |
| -bogus_write(int fd, char *buf, size_t nbytes) |
175 |
| -{ |
176 |
| - memcpy(twophase_buf + twophase_pos, buf, nbytes); |
177 |
| - twophase_pos += nbytes; |
178 |
| - return nbytes; |
179 |
| -} |
180 |
| - |
181 |
| - |
| 176 | +size_t bogus_write(int fd, char *buf, size_t nbytes); |
| 177 | +// LWLock *xlogreclock; |
182 | 178 |
|
183 | 179 | /*
|
184 | 180 | * Initialization of shared memory
|
@@ -413,6 +409,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
|
413 | 409 | gxact->prepared_at = prepared_at;
|
414 | 410 | /* initialize LSN to 0 (start of WAL) */
|
415 | 411 | gxact->prepare_lsn = 0;
|
| 412 | + gxact->prepare_xlogptr = 0; |
416 | 413 | gxact->owner = owner;
|
417 | 414 | gxact->locking_backend = MyBackendId;
|
418 | 415 | gxact->valid = false;
|
@@ -1144,11 +1141,25 @@ EndPrepare(GlobalTransaction gxact)
|
1144 | 1141 | MyPgXact->delayChkpt = true;
|
1145 | 1142 |
|
1146 | 1143 | XLogBeginInsert();
|
| 1144 | + |
1147 | 1145 | for (record = records.head; record != NULL; record = record->next)
|
1148 | 1146 | XLogRegisterData(record->data, record->len);
|
| 1147 | + |
| 1148 | + // LWLockAcquire(xlogreclock, LW_EXCLUSIVE); |
| 1149 | + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); |
| 1150 | + gxact->prepare_xlogptr = GetXLogInsertRecPtr(); |
1149 | 1151 | gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
|
| 1152 | + LWLockRelease(TwoPhaseStateLock); |
| 1153 | + // LWLockRelease(xlogreclock); |
| 1154 | + |
| 1155 | + |
1150 | 1156 | XLogFlush(gxact->prepare_lsn);
|
1151 | 1157 |
|
| 1158 | + |
| 1159 | + fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n", gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr)); |
| 1160 | + fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n", gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn)); |
| 1161 | + |
| 1162 | + |
1152 | 1163 | /* If we crash now, we have prepared: WAL replay will fix things */
|
1153 | 1164 |
|
1154 | 1165 | /* write correct CRC and close file */
|
@@ -2228,3 +2239,196 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
2228 | 2239 | */
|
2229 | 2240 | SyncRepWaitForLSN(recptr);
|
2230 | 2241 | }
|
| 2242 | + |
| 2243 | + |
| 2244 | + |
| 2245 | + |
| 2246 | + |
| 2247 | + |
| 2248 | + |
| 2249 | + |
| 2250 | + |
| 2251 | +/**********************************************************************************/ |
| 2252 | + |
| 2253 | + |
| 2254 | +static int xlogreadfd = -1; |
| 2255 | +static XLogSegNo xlogreadsegno = -1; |
| 2256 | +static char xlogfpath[MAXPGPATH]; |
| 2257 | + |
| 2258 | +typedef struct XLogPageReadPrivate |
| 2259 | +{ |
| 2260 | + const char *datadir; |
| 2261 | + TimeLineID tli; |
| 2262 | +} XLogPageReadPrivate; |
| 2263 | + |
| 2264 | +size_t |
| 2265 | +bogus_write(int fd, char *buf, size_t nbytes) |
| 2266 | +{ |
| 2267 | + memcpy(twophase_buf + twophase_pos, buf, nbytes); |
| 2268 | + twophase_pos += nbytes; |
| 2269 | + return nbytes; |
| 2270 | +} |
| 2271 | + |
| 2272 | + |
| 2273 | +static int SimpleXLogPageRead(XLogReaderState *xlogreader, |
| 2274 | + XLogRecPtr targetPagePtr, |
| 2275 | + int reqLen, XLogRecPtr targetRecPtr, char *readBuf, |
| 2276 | + TimeLineID *pageTLI); |
| 2277 | + |
| 2278 | + |
| 2279 | +/* XLogreader callback function, to read a WAL page */ |
| 2280 | +static int |
| 2281 | +SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, |
| 2282 | + int reqLen, XLogRecPtr targetRecPtr, char *readBuf, |
| 2283 | + TimeLineID *pageTLI) |
| 2284 | +{ |
| 2285 | + XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; |
| 2286 | + uint32 targetPageOff; |
| 2287 | + XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; |
| 2288 | + |
| 2289 | + XLByteToSeg(targetPagePtr, targetSegNo); |
| 2290 | + targetPageOff = targetPagePtr % XLogSegSize; |
| 2291 | + |
| 2292 | + /* |
| 2293 | + * See if we need to switch to a new segment because the requested record |
| 2294 | + * is not in the currently open one. |
| 2295 | + */ |
| 2296 | + if (xlogreadfd >= 0 && !XLByteInSeg(targetPagePtr, xlogreadsegno)) |
| 2297 | + { |
| 2298 | + close(xlogreadfd); |
| 2299 | + xlogreadfd = -1; |
| 2300 | + } |
| 2301 | + |
| 2302 | + XLByteToSeg(targetPagePtr, xlogreadsegno); |
| 2303 | + |
| 2304 | + if (xlogreadfd < 0) |
| 2305 | + { |
| 2306 | + char xlogfname[MAXFNAMELEN]; |
| 2307 | + |
| 2308 | + XLogFileName(xlogfname, private->tli, xlogreadsegno); |
| 2309 | + |
| 2310 | + snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname); |
| 2311 | + |
| 2312 | + xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0); |
| 2313 | + |
| 2314 | + if (xlogreadfd < 0) |
| 2315 | + { |
| 2316 | + printf(_("could not open file \"%s\": %s\n"), xlogfpath, |
| 2317 | + strerror(errno)); |
| 2318 | + return -1; |
| 2319 | + } |
| 2320 | + } |
| 2321 | + |
| 2322 | + /* |
| 2323 | + * At this point, we have the right segment open. |
| 2324 | + */ |
| 2325 | + Assert(xlogreadfd != -1); |
| 2326 | + |
| 2327 | + /* Read the requested page */ |
| 2328 | + if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) |
| 2329 | + { |
| 2330 | + printf(_("could not seek in file \"%s\": %s\n"), xlogfpath, |
| 2331 | + strerror(errno)); |
| 2332 | + return -1; |
| 2333 | + } |
| 2334 | + |
| 2335 | + if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) |
| 2336 | + { |
| 2337 | + printf(_("could not read from file \"%s\": %s\n"), xlogfpath, |
| 2338 | + strerror(errno)); |
| 2339 | + return -1; |
| 2340 | + } |
| 2341 | + |
| 2342 | + Assert(targetSegNo == xlogreadsegno); |
| 2343 | + |
| 2344 | + *pageTLI = private->tli; |
| 2345 | + return XLOG_BLCKSZ; |
| 2346 | +} |
| 2347 | + |
| 2348 | +// XLogRecPtr |
| 2349 | +// readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli); |
| 2350 | + |
| 2351 | +// XLogRecPtr |
| 2352 | +// readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli) |
| 2353 | +// { |
| 2354 | +// XLogRecord *record; |
| 2355 | +// XLogReaderState *xlogreader; |
| 2356 | +// char *errormsg; |
| 2357 | +// XLogPageReadPrivate private; |
| 2358 | +// XLogRecPtr endptr; |
| 2359 | + |
| 2360 | +// private.datadir = datadir; |
| 2361 | +// private.tli = tli; |
| 2362 | +// xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private); |
| 2363 | +// if (xlogreader == NULL) |
| 2364 | +// pg_fatal("out of memory\n"); |
| 2365 | + |
| 2366 | +// record = XLogReadRecord(xlogreader, ptr, &errormsg); |
| 2367 | +// if (record == NULL) |
| 2368 | +// { |
| 2369 | +// if (errormsg) |
| 2370 | +// pg_fatal("could not read WAL record at %X/%X: %s\n", |
| 2371 | +// (uint32) (ptr >> 32), (uint32) (ptr), errormsg); |
| 2372 | +// else |
| 2373 | +// pg_fatal("could not read WAL record at %X/%X\n", |
| 2374 | +// (uint32) (ptr >> 32), (uint32) (ptr)); |
| 2375 | +// } |
| 2376 | +// endptr = xlogreader->EndRecPtr; |
| 2377 | + |
| 2378 | +// XLogReaderFree(xlogreader); |
| 2379 | +// if (xlogreadfd != -1) |
| 2380 | +// { |
| 2381 | +// close(xlogreadfd); |
| 2382 | +// xlogreadfd = -1; |
| 2383 | +// } |
| 2384 | + |
| 2385 | +// return endptr; |
| 2386 | +// } |
| 2387 | + |
| 2388 | + |
| 2389 | +// static char * |
| 2390 | +// XlogReadTwoPhaseData(XLogRecPtr lsn, bool give_warnings, TimeLineID tli) |
| 2391 | +// { |
| 2392 | +// XLogRecord *record; |
| 2393 | +// XLogReaderState *xlogreader; |
| 2394 | +// XLogPageReadPrivate private; |
| 2395 | + |
| 2396 | +// private.datadir = datadir; |
| 2397 | +// private.tli = tli; |
| 2398 | +// xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private); |
| 2399 | +// if (xlogreader == NULL) |
| 2400 | +// pg_fatal("out of memory\n"); |
| 2401 | + |
| 2402 | +// record = XLogReadRecord(xlogreader, ptr, &errormsg); |
| 2403 | +// if (record == NULL) |
| 2404 | +// { |
| 2405 | +// if (errormsg) |
| 2406 | +// pg_fatal("could not read WAL record at %X/%X: %s\n", |
| 2407 | +// (uint32) (ptr >> 32), (uint32) (ptr), errormsg); |
| 2408 | +// else |
| 2409 | +// pg_fatal("could not read WAL record at %X/%X\n", |
| 2410 | +// (uint32) (ptr >> 32), (uint32) (ptr)); |
| 2411 | +// } |
| 2412 | +// endptr = xlogreader->EndRecPtr; |
| 2413 | + |
| 2414 | +// XLogReaderFree(xlogreader); |
| 2415 | +// if (xlogreadfd != -1) |
| 2416 | +// { |
| 2417 | +// close(xlogreadfd); |
| 2418 | +// xlogreadfd = -1; |
| 2419 | +// } |
| 2420 | + |
| 2421 | +// return XLogRecGetData(record) |
| 2422 | +// } |
| 2423 | + |
| 2424 | + |
| 2425 | + |
| 2426 | + |
| 2427 | + |
| 2428 | + |
| 2429 | + |
| 2430 | + |
| 2431 | + |
| 2432 | + |
| 2433 | + |
| 2434 | + |
0 commit comments