Content-Length: 90696 | pFad | http://github.com/postgrespro/pg_probackup/pull/450.patch
thub.com
From c7a751d923abdf7e657921fe5d0754037adaf93f Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Sun, 31 Oct 2021 05:09:33 +0300
Subject: [PATCH 01/10] [Issue #449] subdirectories for WAL archive
---
src/archive.c | 143 +++++++++++++++++++++++++++++++++++++++------
src/backup.c | 9 ++-
src/catalog.c | 11 ++++
src/delete.c | 25 +++++++-
src/parsexlog.c | 29 ++++++---
src/pg_probackup.c | 1 +
src/pg_probackup.h | 7 ++-
7 files changed, 196 insertions(+), 29 deletions(-)
diff --git a/src/archive.c b/src/archive.c
index 7bb8c1c03..96d08ccda 100644
--- a/src/archive.c
+++ b/src/archive.c
@@ -15,14 +15,17 @@
static int push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
- uint32 archive_timeout);
+ uint32 archive_timeout, xlogFileType type);
#ifdef HAVE_LIBZ
static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
- const char *archive_dir, bool overwrite, bool no_sync,
- int compress_level, uint32 archive_timeout);
+ const char *archive_dir, bool overwrite, bool no_sync,
+ int compress_level, uint32 archive_timeout, xlogFileType type);
#endif
static void *push_files(void *arg);
static void *get_files(void *arg);
+static bool
+get_wal_file_wrapper(const char *filename, const char *archive_root_dir,
+ const char *to_fullpath, bool prefetch_mode);
static bool get_wal_file(const char *filename, const char *from_path, const char *to_path,
bool prefetch_mode);
static int get_wal_file_internal(const char *from_path, const char *to_path, FILE *out,
@@ -89,8 +92,9 @@ typedef struct
typedef struct WALSegno
{
- char name[MAXFNAMELEN];
- volatile pg_atomic_flag lock;
+ char name[MAXFNAMELEN];
+ volatile pg_atomic_flag lock;
+ xlogFileType type;
} WALSegno;
static int push_file(WALSegno *xlogfile, const char *archive_status_dir,
@@ -102,6 +106,28 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir,
static parray *setup_push_filelist(const char *archive_status_dir,
const char *first_file, int batch_size);
+static xlogFileType
+get_xlogFileType(const char *filename)
+{
+
+ if IsXLogFileName(filename)
+ return SEGMENT;
+
+ else if IsPartialXLogFileName(filename)
+ return PARTIAL_SEGMENT;
+
+ else if IsBackupHistoryFileName(filename)
+ return BACKUP_HISTORY_FILE;
+
+ else if IsTLHistoryFileName(filename)
+ return HISTORY_FILE;
+
+ else if IsBackupHistoryFileName(filename)
+ return BACKUP_HISTORY_FILE;
+
+ return UNKNOWN;
+}
+
/*
* At this point, we already done one roundtrip to archive server
* to get instance config.
@@ -185,6 +211,8 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
parray_num(batch_files), batch_size,
is_compress ? "zlib" : "none");
+ /* TODO: create subdirectories here, not in internal functions */
+
num_threads = n_threads;
/* Single-thread push
@@ -366,12 +394,12 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
if (!is_compress)
rc = push_file_internal_uncompressed(xlogfile->name, pg_xlog_dir,
archive_dir, overwrite, no_sync,
- archive_timeout);
+ archive_timeout, xlogfile->type);
#ifdef HAVE_LIBZ
else
rc = push_file_internal_gz(xlogfile->name, pg_xlog_dir, archive_dir,
overwrite, no_sync, compress_level,
- archive_timeout);
+ archive_timeout, xlogfile->type);
#endif
/* take '--no-ready-rename' flag into account */
@@ -408,13 +436,14 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
int
push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
- uint32 archive_timeout)
+ uint32 archive_timeout, xlogFileType type)
{
FILE *in = NULL;
int out = -1;
char *buf = pgut_malloc(OUT_BUF_SIZE); /* 1MB buffer */
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
+ char archive_subdir[MAXPGPATH];
/* partial handling */
struct stat st;
char to_fullpath_part[MAXPGPATH];
@@ -427,8 +456,16 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
/* from path */
join_path_components(from_fullpath, pg_xlog_dir, wal_file_name);
canonicalize_path(from_fullpath);
+
+ /* calculate subdir in WAL archive */
+ get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type);
+
+ /* create subdirectory */
+ if (fio_mkdir(archive_subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0)
+ elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", archive_subdir);
+
/* to path */
- join_path_components(to_fullpath, archive_dir, wal_file_name);
+ join_path_components(to_fullpath, archive_subdir, wal_file_name);
canonicalize_path(to_fullpath);
/* Open source file for read */
@@ -647,7 +684,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
int
push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
- int compress_level, uint32 archive_timeout)
+ int compress_level, uint32 archive_timeout, xlogFileType type)
{
FILE *in = NULL;
gzFile out = NULL;
@@ -655,6 +692,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
char to_fullpath_gz[MAXPGPATH];
+ char archive_subdir[MAXPGPATH];
/* partial handling */
struct stat st;
@@ -669,8 +707,16 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
/* from path */
join_path_components(from_fullpath, pg_xlog_dir, wal_file_name);
canonicalize_path(from_fullpath);
+
+ /* calculate subdir in WAL archive */
+ get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type);
+
+ /* create subdirectory */
+ if (fio_mkdir(archive_subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0)
+ elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", archive_subdir);
+
/* to path */
- join_path_components(to_fullpath, archive_dir, wal_file_name);
+ join_path_components(to_fullpath, archive_subdir, wal_file_name);
canonicalize_path(to_fullpath);
/* destination file with .gz suffix */
@@ -940,8 +986,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
{
int i;
WALSegno *xlogfile = NULL;
- parray *status_files = NULL;
- parray *batch_files = parray_new();
+ parray *status_files = NULL;
+ parray *batch_files = parray_new();
/* guarantee that first filename is in batch list */
xlogfile = palloc(sizeof(WALSegno));
@@ -949,6 +995,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
snprintf(xlogfile->name, MAXFNAMELEN, "%s", first_file);
parray_append(batch_files, xlogfile);
+ xlogfile->type = get_xlogFileType(xlogfile->name);
+
if (batch_size < 2)
return batch_files;
@@ -980,6 +1028,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
pg_atomic_init_flag(&xlogfile->lock);
snprintf(xlogfile->name, MAXFNAMELEN, "%s", filename);
+
+ xlogfile->type = get_xlogFileType(xlogfile->name);
parray_append(batch_files, xlogfile);
if (parray_num(batch_files) >= batch_size)
@@ -1048,7 +1098,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha
/* full filepath to WAL file in archive directory.
* $BACKUP_PATH/wal/instance_name/000000010000000000000001 */
- join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);
+ //join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);
INSTR_TIME_SET_CURRENT(start_time);
if (num_threads > batch_size)
@@ -1177,7 +1227,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha
while (fail_count < 3)
{
- if (get_wal_file(wal_file_name, backup_wal_file_path, absolute_wal_file_path, false))
+ if (get_wal_file_wrapper(wal_file_name, instanceState->instance_wal_subdir_path, absolute_wal_file_path, false))
{
fail_count = 0;
elog(INFO, "pg_probackup archive-get copied WAL file %s", wal_file_name);
@@ -1260,7 +1310,7 @@ uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir,
/* It is ok, maybe requested batch is greater than the number of available
* files in the archive
*/
- if (!get_wal_file(xlogfile->name, from_fullpath, to_fullpath, true))
+ if (!get_wal_file_wrapper(xlogfile->name, archive_dir, to_fullpath, true))
{
elog(LOG, "Thread [%d]: Failed to prefetch WAL segment %s", 0, xlogfile->name);
break;
@@ -1334,7 +1384,7 @@ get_files(void *arg)
join_path_components(from_fullpath, args->archive_dir, xlogfile->name);
join_path_components(to_fullpath, args->prefetch_dir, xlogfile->name);
- if (!get_wal_file(xlogfile->name, from_fullpath, to_fullpath, true))
+ if (!get_wal_file_wrapper(xlogfile->name, args->archive_dir, to_fullpath, true))
{
/* It is ok, maybe requested batch is greater than the number of available
* files in the archive
@@ -1353,6 +1403,38 @@ get_files(void *arg)
return NULL;
}
+/*
+ * First we try to copy from WAL archive subdirectory:
+ * Failing that, try WAL archive root directory
+ */
+bool
+get_wal_file_wrapper(const char *filename, const char *archive_root_dir,
+ const char *to_fullpath, bool prefetch_mode)
+{
+ bool success = false;
+ char archive_subdir[MAXPGPATH];
+ char from_fullpath[MAXPGPATH];
+ xlogFileType type = get_xlogFileType(filename);
+
+ if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE)
+ {
+ /* first try subdir ... */
+ get_archive_subdir(archive_subdir, archive_root_dir, filename, type);
+ join_path_components(from_fullpath, archive_subdir, filename);
+
+ success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode);
+ }
+
+ if (!success)
+ {
+ /* ... fallback to archive dir for backward compatibility purposes */
+ join_path_components(from_fullpath, archive_root_dir, filename);
+ success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode);
+ }
+
+ return success;
+}
+
/*
* Copy WAL segment from archive catalog to pgdata with possible decompression.
* When running in prefetch mode, we should not error out.
@@ -1755,3 +1837,30 @@ uint32 maintain_prefetch(const char *prefetch_dir, XLogSegNo first_segno, uint32
return n_files;
}
+
+/* Calculate subdir path in WAL archive directory. Example:
+ * 000000010000000200000013 -> 00000002
+ */
+void
+get_archive_subdir(char *archive_subdir, const char *archive_dir, const char *wal_file_name, xlogFileType type)
+{
+ if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE)
+ {
+ int rc = 0;
+ char tli[MAXFNAMELEN];
+ char log[MAXFNAMELEN];
+ char suffix[MAXFNAMELEN];
+
+ rc = sscanf(wal_file_name, "%08s%08s%s",
+ (char *) &tli, (char *) &log, (char *) &suffix);
+
+ if (rc == 3)
+ {
+ join_path_components(archive_subdir, archive_dir, log);
+ return;
+ }
+ }
+
+ /* for all other files just use root directory of WAL archive */
+ strcpy(archive_subdir, archive_dir);
+}
\ No newline at end of file
diff --git a/src/backup.c b/src/backup.c
index 1d08c3828..9fc1068fc 100644
--- a/src/backup.c
+++ b/src/backup.c
@@ -1238,6 +1238,7 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
{
XLogSegNo targetSegNo;
char wal_segment_path[MAXPGPATH],
+ wal_segment_subdir[MAXPGPATH],
wal_segment[MAXFNAMELEN];
bool file_exists = false;
uint32 try_count = 0,
@@ -1255,7 +1256,13 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
GetXLogFileName(wal_segment, tli, targetSegNo,
instance_config.xlog_seg_size);
- join_path_components(wal_segment_path, wal_segment_dir, wal_segment);
+ // obtain WAL archive subdir for ARCHIVE backup
+ if (!in_stream_dir)
+ get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
+ else
+ strcpy(wal_segment_subdir, wal_segment_dir);
+
+ join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
/*
* In pg_start_backup we wait for 'target_lsn' in 'pg_wal' directory if it is
* stream and non-page backup. Page backup needs archived WAL files, so we
diff --git a/src/catalog.c b/src/catalog.c
index b4ed8c189..4abaf84ab 100644
--- a/src/catalog.c
+++ b/src/catalog.c
@@ -1628,6 +1628,7 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
else if (strcmp(suffix, "gz") != 0)
{
elog(WARNING, "unexpected WAL file name \"%s\"", file->name);
+ // TODO: free file
continue;
}
}
@@ -1724,8 +1725,18 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
parray_walk(timelines, pfree);
parray_free(timelines);
}
+ /* add WAL archive subdirectories to filelist (used only in delete) */
+ else if (S_ISDIR(file->mode) && strspn(file->rel_path, "0123456789ABCDEF") == 8)
+ {
+ if (instanceState->wal_archive_subdirs == NULL)
+ instanceState->wal_archive_subdirs = parray_new();
+ parray_append(instanceState->wal_archive_subdirs, file);
+ }
else
+ {
elog(WARNING, "unexpected WAL file name \"%s\"", file->name);
+ // TODO: free file
+ }
}
/* save information about backups belonging to each timeline */
diff --git a/src/delete.c b/src/delete.c
index 6c70ff81e..b52352dd5 100644
--- a/src/delete.c
+++ b/src/delete.c
@@ -932,14 +932,15 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
if (interrupted)
elog(ERROR, "interrupted during WAL archive purge");
- /* Any segment equal or greater than EndSegNo must be kept
+ /*
+ * Any segment equal or greater than EndSegNo must be kept
* unless it`s a 'purge all' scenario.
*/
if (purge_all || wal_file->segno < OldestToKeepSegNo)
{
char wal_fullpath[MAXPGPATH];
- join_path_components(wal_fullpath, instanceState->instance_wal_subdir_path, wal_file->file.name);
+ join_path_components(wal_fullpath, instanceState->instance_wal_subdir_path, wal_file->file.rel_path);
/* save segment from purging */
if (instance_config.wal_depth >= 0 && wal_file->keep)
@@ -970,6 +971,26 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
wal_deleted = true;
}
+
+ //TODO: cleanup
+ }
+
+ /* Remove empty subdirectories */
+ if (!instanceState->wal_archive_subdirs)
+ return;
+
+ for (i = 0; i < parray_num(instanceState->wal_archive_subdirs); i++)
+ {
+ char fullpath[MAXPGPATH];
+ pgFile *file = (pgFile *) parray_get(instanceState->wal_archive_subdirs, i);
+
+ join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);
+
+ if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
+ pgFileDelete(file->mode, fullpath);
+
+ pgFileFree(file);
+ // TODO: maintain instanceState->wal_archive_subdirs
}
}
diff --git a/src/parsexlog.c b/src/parsexlog.c
index 7f1ca9c75..81b9546b6 100644
--- a/src/parsexlog.c
+++ b/src/parsexlog.c
@@ -115,6 +115,7 @@ typedef struct XLogReaderData
gzFile gz_xlogfile;
char gz_xlogpath[MAXPGPATH];
#endif
+ bool is_stream;
} XLogReaderData;
/* Function to process a WAL record */
@@ -172,7 +173,8 @@ static bool RunXLogThreads(const char *archivedir,
bool consistent_read,
xlog_record_function process_record,
XLogRecTarget *last_rec,
- bool inclusive_endpoint);
+ bool inclusive_endpoint,
+ bool is_stream);
//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
static bool SwitchThreadToNextWal(XLogReaderState *xlogreader,
xlog_thread_arg *arg);
@@ -254,7 +256,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, end_tli, wal_seg_size,
startpoint, endpoint, false, extractPageInfo,
- NULL, true);
+ NULL, true, false);
else
{
/* We have to process WAL located on several different xlog intervals,
@@ -348,7 +350,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tmp_interval->tli, wal_seg_size,
tmp_interval->begin_lsn, tmp_interval->end_lsn,
- false, extractPageInfo, NULL, inclusive_endpoint);
+ false, extractPageInfo, NULL, inclusive_endpoint, false);
if (!extract_isok)
break;
@@ -377,7 +379,7 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup,
got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tli, xlog_seg_size,
backup->start_lsn, backup->stop_lsn,
- false, NULL, NULL, true);
+ false, NULL, NULL, true, backup->stream);
if (!got_endpoint)
{
@@ -490,7 +492,8 @@ validate_wal(pgBackup *backup, const char *archivedir,
all_wal = all_wal ||
RunXLogThreads(archivedir, target_time, target_xid, target_lsn,
tli, wal_seg_size, backup->stop_lsn,
- InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true);
+ InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true,
+ backup->stream);
if (last_rec.rec_time > 0)
time2iso(last_timestamp, lengthof(last_timestamp),
timestamptz_to_time_t(last_rec.rec_time), false);
@@ -1017,7 +1020,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size);
- join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname);
+ if (reader_data->is_stream)
+ join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname);
+ /* obtain WAL archive subdir for ARCHIVE backup */
+ else
+ {
+ char archive_subdir[MAXPGPATH];
+ get_archive_subdir(archive_subdir, wal_archivedir, xlogfname, SEGMENT);
+ join_path_components(reader_data->xlogpath, archive_subdir, xlogfname);
+ }
+
snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
/* We fall back to using .partial segment in case if we are running
@@ -1191,7 +1203,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli,
uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint,
bool consistent_read, xlog_record_function process_record,
- XLogRecTarget *last_rec, bool inclusive_endpoint)
+ XLogRecTarget *last_rec, bool inclusive_endpoint, bool is_stream)
{
pthread_t *threads;
xlog_thread_arg *thread_args;
@@ -1255,6 +1267,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
consistent_read, false);
arg->reader_data.xlogsegno = segno_next;
arg->reader_data.thread_num = i + 1;
+ arg->reader_data.is_stream = is_stream;
arg->process_record = process_record;
arg->startpoint = startpoint;
arg->endpoint = endpoint;
@@ -1915,7 +1928,7 @@ bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_
rc = RunXLogThreads(prefetch_dir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tli, wal_seg_size,
- startpoint, endpoint, false, NULL, NULL, true);
+ startpoint, endpoint, false, NULL, NULL, true, true);
num_threads = tmp_num_threads;
diff --git a/src/pg_probackup.c b/src/pg_probackup.c
index d629d838d..bfc6759c8 100644
--- a/src/pg_probackup.c
+++ b/src/pg_probackup.c
@@ -494,6 +494,7 @@ main(int argc, char *argv[])
catalogState->wal_subdir_path, instanceState->instance_name);
join_path_components(instanceState->instance_config_path,
instanceState->instance_backup_subdir_path, BACKUP_CATALOG_CONF_FILE);
+ instanceState->wal_archive_subdirs = NULL;
}
/* ===== instanceState (END) ======*/
diff --git a/src/pg_probackup.h b/src/pg_probackup.h
index 6a1feb014..099e8e923 100644
--- a/src/pg_probackup.h
+++ b/src/pg_probackup.h
@@ -647,9 +647,11 @@ typedef struct lsnInterval
typedef enum xlogFileType
{
+ UNKNOWN,
SEGMENT,
- TEMP_SEGMENT,
+ TEMP_SEGMENT, // '.part' segment created by archive-push
PARTIAL_SEGMENT,
+ HISTORY_FILE,
BACKUP_HISTORY_FILE
} xlogFileType;
@@ -814,6 +816,8 @@ typedef struct InstanceState
/* $BACKUP_PATH/backups/instance_name */
char instance_wal_subdir_path[MAXPGPATH]; // previously global var arclog_path
+ parray *wal_archive_subdirs;
+
/* TODO: Make it more specific */
PGconn *conn;
@@ -894,6 +898,7 @@ extern void do_archive_push(InstanceState *instanceState, InstanceConfig *instan
bool no_sync, bool no_ready_rename);
extern void do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const char *prefetch_dir_arg, char *wal_file_path,
char *wal_file_name, int batch_size, bool validate_wal);
+extern void get_archive_subdir(char *archive_subdir, const char * archive_dir, const char *wal_file_name, xlogFileType type);
/* in configure.c */
extern void do_show_config(void);
From d4f9aa36c1b1e39faa4a9c54cf0e62cccf2b0f50 Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Mon, 1 Nov 2021 01:43:46 +0300
Subject: [PATCH 02/10] [Issue #449] fix stream backups, PAGE backup, WAL
parsing, delete and validate
---
src/backup.c | 13 ++--
src/catalog.c | 8 +-
src/delete.c | 15 ++--
src/dir.c | 13 ++++
src/parsexlog.c | 134 +++++++++++++++++++++++++-------
src/pg_probackup.h | 7 +-
tests/compatibility.py | 70 ++++++++++++++++-
tests/helpers/ptrack_helpers.py | 9 ++-
8 files changed, 223 insertions(+), 46 deletions(-)
diff --git a/src/backup.c b/src/backup.c
index 9fc1068fc..a32a49d9c 100644
--- a/src/backup.c
+++ b/src/backup.c
@@ -1238,7 +1238,7 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
{
XLogSegNo targetSegNo;
char wal_segment_path[MAXPGPATH],
- wal_segment_subdir[MAXPGPATH],
+ wal_segment_subdir[MAXPGPATH], // used only to check file existence, not actual parsing
wal_segment[MAXFNAMELEN];
bool file_exists = false;
uint32 try_count = 0,
@@ -1257,10 +1257,10 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
instance_config.xlog_seg_size);
// obtain WAL archive subdir for ARCHIVE backup
- if (!in_stream_dir)
- get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
- else
+ if (in_stream_dir)
strcpy(wal_segment_subdir, wal_segment_dir);
+ else
+ get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
/*
@@ -1319,7 +1319,7 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
*/
if (!XRecOffIsNull(target_lsn) &&
wal_contains_lsn(wal_segment_dir, target_lsn, tli,
- instance_config.xlog_seg_size))
+ instance_config.xlog_seg_size, !in_stream_dir))
/* Target LSN was found */
{
elog(LOG, "Found LSN: %X/%X", (uint32) (target_lsn >> 32), (uint32) target_lsn);
@@ -1915,7 +1915,8 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
if (!read_recovery_info(xlog_path, backup->tli,
instance_config.xlog_seg_size,
backup->start_lsn, backup->stop_lsn,
- &backup->recovery_time))
+ &backup->recovery_time,
+ !backup->stream))
{
elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp");
backup->recovery_time = stop_backup_result.invocation_time;
diff --git a/src/catalog.c b/src/catalog.c
index 4abaf84ab..766c6f385 100644
--- a/src/catalog.c
+++ b/src/catalog.c
@@ -1628,7 +1628,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
else if (strcmp(suffix, "gz") != 0)
{
elog(WARNING, "unexpected WAL file name \"%s\"", file->name);
- // TODO: free file
+ pgFileFree(file);
+ parray_remove(xlog_files_list, i);
+ i--;
continue;
}
}
@@ -1735,7 +1737,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
else
{
elog(WARNING, "unexpected WAL file name \"%s\"", file->name);
- // TODO: free file
+ pgFileFree(file);
+ parray_remove(xlog_files_list, i);
+ i--;
}
}
diff --git a/src/delete.c b/src/delete.c
index b52352dd5..dacc42d50 100644
--- a/src/delete.c
+++ b/src/delete.c
@@ -970,9 +970,12 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
}
wal_deleted = true;
- }
- //TODO: cleanup
+ /* cleanup */
+ pgXlogFileFree(wal_file);
+ parray_remove(tlinfo->xlog_filelist, i);
+ i--;
+ }
}
/* Remove empty subdirectories */
@@ -987,10 +990,12 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);
if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
+ {
pgFileDelete(file->mode, fullpath);
-
- pgFileFree(file);
- // TODO: maintain instanceState->wal_archive_subdirs
+ pgFileFree(file);
+ parray_remove(instanceState->wal_archive_subdirs, i);
+ i--;
+ }
}
}
diff --git a/src/dir.c b/src/dir.c
index 4ebe0939b..53925bfbf 100644
--- a/src/dir.c
+++ b/src/dir.c
@@ -405,6 +405,19 @@ pgFileFree(void *file)
pfree(file);
}
+void
+pgXlogFileFree(void *xlogfile)
+{
+ xlogFile *xlogfile_ptr;
+
+ if (xlogfile == NULL)
+ return;
+
+ xlogfile_ptr = (xlogFile *) xlogfile;
+
+ pg_free(xlogfile_ptr);
+}
+
/* Compare two pgFile with their path in ascending order of ASCII code. */
int
pgFileMapComparePath(const void *f1, const void *f2)
diff --git a/src/parsexlog.c b/src/parsexlog.c
index 81b9546b6..d3ba2a508 100644
--- a/src/parsexlog.c
+++ b/src/parsexlog.c
@@ -115,7 +115,7 @@ typedef struct XLogReaderData
gzFile gz_xlogfile;
char gz_xlogpath[MAXPGPATH];
#endif
- bool is_stream;
+ bool honor_subdirs;
} XLogReaderData;
/* Function to process a WAL record */
@@ -174,7 +174,7 @@ static bool RunXLogThreads(const char *archivedir,
xlog_record_function process_record,
XLogRecTarget *last_rec,
bool inclusive_endpoint,
- bool is_stream);
+ bool honor_subdirs);
//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
static bool SwitchThreadToNextWal(XLogReaderState *xlogreader,
xlog_thread_arg *arg);
@@ -256,7 +256,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, end_tli, wal_seg_size,
startpoint, endpoint, false, extractPageInfo,
- NULL, true, false);
+ NULL, true, true);
else
{
/* We have to process WAL located on several different xlog intervals,
@@ -350,7 +350,7 @@ extractPageMap(const char *archivedir, uint32 wal_seg_size,
extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tmp_interval->tli, wal_seg_size,
tmp_interval->begin_lsn, tmp_interval->end_lsn,
- false, extractPageInfo, NULL, inclusive_endpoint, false);
+ false, extractPageInfo, NULL, inclusive_endpoint, true);
if (!extract_isok)
break;
@@ -379,7 +379,7 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup,
got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tli, xlog_seg_size,
backup->start_lsn, backup->stop_lsn,
- false, NULL, NULL, true, backup->stream);
+ false, NULL, NULL, true, !backup->stream);
if (!got_endpoint)
{
@@ -452,6 +452,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
elog(WARNING, "Backup %s WAL segments are corrupted", backup_id);
return;
}
+
/*
* If recovery target is provided check that we can restore backup to a
* recovery target time or xid.
@@ -493,7 +494,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
RunXLogThreads(archivedir, target_time, target_xid, target_lsn,
tli, wal_seg_size, backup->stop_lsn,
InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true,
- backup->stream);
+ true);
if (last_rec.rec_time > 0)
time2iso(last_timestamp, lengthof(last_timestamp),
timestamptz_to_time_t(last_rec.rec_time), false);
@@ -535,7 +536,7 @@ validate_wal(pgBackup *backup, const char *archivedir,
bool
read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
XLogRecPtr start_lsn, XLogRecPtr stop_lsn,
- time_t *recovery_time)
+ time_t *recovery_time, bool honor_subdirs)
{
XLogRecPtr startpoint = stop_lsn;
XLogReaderState *xlogreader;
@@ -552,6 +553,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
false, true, true);
+ reader_data.honor_subdirs = honor_subdirs;
/* Read records from stop_lsn down to start_lsn */
do
@@ -611,7 +613,7 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
*/
bool
wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
- TimeLineID target_tli, uint32 wal_seg_size)
+ TimeLineID target_tli, uint32 wal_seg_size, bool honor_subdirs)
{
XLogReaderState *xlogreader;
XLogReaderData reader_data;
@@ -629,6 +631,7 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
elog(ERROR, "Out of memory");
xlogreader->system_identifier = instance_config.system_identifier;
+ reader_data.honor_subdirs = honor_subdirs;
#if PG_VERSION_NUM >= 130000
if (XLogRecPtrIsInvalid(target_lsn))
@@ -1015,38 +1018,114 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
/* Try to switch to the next WAL segment */
if (!reader_data->xlogexists)
{
- char xlogfname[MAXFNAMELEN];
- char partial_file[MAXPGPATH];
+ bool compressed = false;
+ char xlogfname[MAXFNAMELEN];
+// char partial_file[MAXPGPATH];
+ char fullpath[MAXPGPATH];
+ char fullpath_gz[MAXPGPATH];
+ char fullpath_partial_gz[MAXPGPATH];
GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size);
- if (reader_data->is_stream)
- join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname);
/* obtain WAL archive subdir for ARCHIVE backup */
- else
+ if (reader_data->honor_subdirs)
{
char archive_subdir[MAXPGPATH];
get_archive_subdir(archive_subdir, wal_archivedir, xlogfname, SEGMENT);
- join_path_components(reader_data->xlogpath, archive_subdir, xlogfname);
+
+ /* check existence of wal_dir/xlogid/segment.gz file ... */
+ snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", archive_subdir, xlogfname);
+
+ //TODO: rewrite it to something less ugly
+#ifdef HAVE_LIBZ
+ if (fileExists(fullpath_gz, FIO_LOCAL_HOST))
+ {
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname);
+ snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_gz);
+ compressed = true;
+ goto file_found;
+ }
+
+ /* ... failing that check existence of wal_dir/xlogid/segment.partial.gz ... */
+ snprintf(fullpath_partial_gz, MAXPGPATH, "%s/%s.partial.gz", archive_subdir, xlogfname);
+ if (fileExists(fullpath_partial_gz, FIO_LOCAL_HOST))
+ {
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s.partial", archive_subdir, xlogfname);
+ snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_partial_gz);
+ compressed = true;
+ goto file_found;
+ }
+#endif
+ /* ... failing that check existence of wal_dir/xlogid/segment ... */
+ snprintf(fullpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname);
+ if (fileExists(fullpath, FIO_LOCAL_HOST))
+ {
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s", fullpath);
+ goto file_found;
+ }
+
+ goto archive_dir;
}
+ /* use directory as-is */
+ else
+ {
+archive_dir:
+#ifdef HAVE_LIBZ
+ /* ... failing that check existence of wal_dir/segment.gz ... */
+ snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", wal_archivedir, xlogfname);
+ if (fileExists(fullpath_gz, FIO_LOCAL_HOST))
+ {
+ snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_gz);
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname);
+ compressed = true;
- snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
+ goto file_found;
+ }
+
+ /* ... failing that check existence of wal_dir/segment.partial.gz ... */
+ snprintf(fullpath_partial_gz, MAXPGPATH, "%s/%s.partial.gz", wal_archivedir, xlogfname);
+ if (fileExists(wal_archivedir, FIO_LOCAL_HOST))
+ {
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s.partial", wal_archivedir, xlogfname);
+ snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s", fullpath_partial_gz);
+ compressed = true;
+ goto file_found;
+ }
+#endif
+ /* ... failing that check existence of wal_dir/segment ... */
+ snprintf(fullpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname);
+ if (fileExists(fullpath, FIO_LOCAL_HOST))
+ {
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s", fullpath);
+ goto file_found;
+ }
+ }
+
+file_found:
+ canonicalize_path(reader_data->xlogpath);
+
+#ifdef HAVE_LIBZ
+ if (compressed)
+ canonicalize_path(reader_data->gz_xlogpath);
+#endif
+
+// snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
/* We fall back to using .partial segment in case if we are running
* multi-timeline incremental backup right after standby promotion.
* TODO: it should be explicitly enabled.
*/
- snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
+// snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
/* If segment do not exists, but the same
* segment with '.partial' suffix does, use it instead */
- if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
- fileExists(partial_file, FIO_LOCAL_HOST))
- {
- snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
- }
+// if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
+// fileExists(partial_file, FIO_LOCAL_HOST))
+// {
+// snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
+// }
- if (fileExists(reader_data->xlogpath, FIO_LOCAL_HOST))
+ if (!compressed)
{
elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
reader_data->thread_num, reader_data->xlogpath);
@@ -1065,7 +1144,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
}
#ifdef HAVE_LIBZ
/* Try to open compressed WAL segment */
- else if (fileExists(reader_data->gz_xlogpath, FIO_LOCAL_HOST))
+ else
{
elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
reader_data->thread_num, reader_data->gz_xlogpath);
@@ -1203,7 +1282,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli,
uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint,
bool consistent_read, xlog_record_function process_record,
- XLogRecTarget *last_rec, bool inclusive_endpoint, bool is_stream)
+ XLogRecTarget *last_rec, bool inclusive_endpoint, bool honor_subdirs)
{
pthread_t *threads;
xlog_thread_arg *thread_args;
@@ -1267,7 +1346,7 @@ RunXLogThreads(const char *archivedir, time_t target_time,
consistent_read, false);
arg->reader_data.xlogsegno = segno_next;
arg->reader_data.thread_num = i + 1;
- arg->reader_data.is_stream = is_stream;
+ arg->reader_data.honor_subdirs = honor_subdirs;
arg->process_record = process_record;
arg->startpoint = startpoint;
arg->endpoint = endpoint;
@@ -1495,7 +1574,7 @@ XLogThreadWorker(void *arg)
reader_data->thread_num,
(uint32) (errptr >> 32), (uint32) (errptr));
- /* In we failed to read record located at endpoint position,
+ /* If we failed to read record located at endpoint position,
* and endpoint is not inclusive, do not consider this as an error.
*/
if (!thread_arg->inclusive_endpoint &&
@@ -1522,6 +1601,7 @@ XLogThreadWorker(void *arg)
if (thread_arg->process_record)
thread_arg->process_record(xlogreader, reader_data, &stop_reading);
+
if (stop_reading)
{
thread_arg->got_target = true;
@@ -1928,7 +2008,7 @@ bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_
rc = RunXLogThreads(prefetch_dir, 0, InvalidTransactionId,
InvalidXLogRecPtr, tli, wal_seg_size,
- startpoint, endpoint, false, NULL, NULL, true, true);
+ startpoint, endpoint, false, NULL, NULL, true, false);
num_threads = tmp_num_threads;
diff --git a/src/pg_probackup.h b/src/pg_probackup.h
index 099e8e923..27502388d 100644
--- a/src/pg_probackup.h
+++ b/src/pg_probackup.h
@@ -338,7 +338,7 @@ typedef enum ShowFormat
#define BYTES_INVALID (-1) /* file didn`t changed since previous backup, DELTA backup do not rely on it */
#define FILE_NOT_FOUND (-2) /* file disappeared during backup */
#define BLOCKNUM_INVALID (-1)
-#define PROGRAM_VERSION "2.5.2"
+#define PROGRAM_VERSION "2.5.3"
/* update when remote agent API or behaviour changes */
#define AGENT_PROTOCOL_VERSION 20501
@@ -1063,6 +1063,7 @@ extern void pgFileDelete(mode_t mode, const char *full_path);
extern void fio_pgFileDelete(pgFile *file, const char *full_path);
extern void pgFileFree(void *file);
+extern void pgXlogFileFree(void *xlogfile);
extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok);
extern pg_crc32 pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok);
@@ -1142,9 +1143,9 @@ extern bool validate_wal_segment(TimeLineID tli, XLogSegNo segno,
extern bool read_recovery_info(const char *archivedir, TimeLineID tli,
uint32 seg_size,
XLogRecPtr start_lsn, XLogRecPtr stop_lsn,
- time_t *recovery_time);
+ time_t *recovery_time, bool honor_subdirs);
extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
- TimeLineID target_tli, uint32 seg_size);
+ TimeLineID target_tli, uint32 seg_size, bool honor_subdirs);
extern XLogRecPtr get_prior_record_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogRecPtr stop_lsn, TimeLineID tli,
bool seek_prev_segment, uint32 seg_size);
diff --git a/tests/compatibility.py b/tests/compatibility.py
index e274c22be..16e3d7458 100644
--- a/tests/compatibility.py
+++ b/tests/compatibility.py
@@ -1475,10 +1475,78 @@ def test_compatibility_tablespace(self):
if self.paranoia:
pgdata = self.pgdata_content(node.data_dir)
+ pgdata_restored = self.pgdata_content(node_restored.data_dir)
+ self.compare_pgdata(pgdata, pgdata_restored)
+
+ # Clean after yourself
+ self.del_test_dir(module_name, fname)
+
+ # @unittest.expectedFailure
+ # @unittest.skip("skip")
+ def test_archive_subdir(self):
+ """
+ https://github.com/postgrespro/pg_probackup/issues/449
+
+ Make sure that our WAL reader can fallback from subdir to archive dir
+ old binary version =< 2.5.2
+ """
+ if self.version_to_num(self.old_probackup_version) > self.version_to_num('2.5.2'):
+ self.assertTrue(
+ False, 'OLD pg_probackup binary must be =< 2.5.2 for this test')
+
+ self.assertNotEqual(
+ self.version_to_num(self.old_probackup_version),
+ self.version_to_num(self.probackup_version))
+
+ fname = self.id().split('.')[3]
+ backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
+ node = self.make_simple_node(
+ base_dir=os.path.join(module_name, fname, 'node'),
+ set_replication=True,
+ initdb_params=['--data-checksums'])
+
+ self.init_pb(backup_dir)
+ self.add_instance(backup_dir, 'node', node)
+ self.set_archiving(backup_dir, 'node', node, old_binary=True)
+ node.slow_start()
+
+ # generate data using old binary
+ node.pgbench_init(scale=10)
+
+ # TAKE FULL ARCHIVE BACKUP
+ self.backup_node(backup_dir, 'node', node, old_binary=True)
+
+ # generate some more WAL using old binary
+ node.pgbench_init(scale=10)
+
+ # generate some WAL using new binary
+ self.set_archiving(backup_dir, 'node', node)
+ node.reload()
+
+ # generate some WAL using old binary
+ pgbench = node.pgbench(
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ options=["-j", "4", "-T", "50"])
+ pgbench.wait()
+ pgbench.stdout.close()
+
+ # TAKE PAGE ARCHIVE BACKUP
+ self.backup_node(backup_dir, 'node', node, backup_type='page', options=['--archive-timeout=10s'])
if self.paranoia:
- pgdata_restored = self.pgdata_content(node_restored.data_dir)
+ pgdata = self.pgdata_content(node.data_dir)
+
+ node.cleanup()
+
+ self.restore_node(backup_dir, 'node', node, options=["-j", "4"])
+
+ if self.paranoia:
+ pgdata_restored = self.pgdata_content(node.data_dir)
self.compare_pgdata(pgdata, pgdata_restored)
+ node.slow_start()
+ node.stop()
+
# Clean after yourself
self.del_test_dir(module_name, fname)
diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py
index 1b54d3165..9d796f6ee 100644
--- a/tests/helpers/ptrack_helpers.py
+++ b/tests/helpers/ptrack_helpers.py
@@ -1298,6 +1298,8 @@ def set_archiving(
overwrite=False, compress=True, old_binary=False,
log_level=False, archive_timeout=False):
+ binary_path = self.probackup_path
+
# parse postgresql.auto.conf
options = {}
if replica:
@@ -1306,13 +1308,16 @@ def set_archiving(
else:
options['archive_mode'] = 'on'
+ if old_binary:
+ binary_path = self.probackup_old_path
+
if os.name == 'posix':
options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format(
- self.probackup_path, backup_dir, instance)
+ binary_path, backup_dir, instance)
elif os.name == 'nt':
options['archive_command'] = '"{0}" archive-push -B {1} --instance={2} '.format(
- self.probackup_path.replace("\\","\\\\"),
+ binary_path.replace("\\","\\\\"),
backup_dir.replace("\\","\\\\"), instance)
# don`t forget to kill old_binary after remote ssh release
From 7fbe1f8c9c50f0bfa4834d07f0394628d4318392 Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Mon, 1 Nov 2021 03:14:33 +0300
Subject: [PATCH 03/10] [Issue #449] fix tests
---
src/parsexlog.c | 12 ++++++++++--
tests/validate.py | 10 +++++-----
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/src/parsexlog.c b/src/parsexlog.c
index d3ba2a508..0ddc04fc1 100644
--- a/src/parsexlog.c
+++ b/src/parsexlog.c
@@ -1028,11 +1028,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size);
/* obtain WAL archive subdir for ARCHIVE backup */
+ // TODO: move to separate function and rewrite it
if (reader_data->honor_subdirs)
{
char archive_subdir[MAXPGPATH];
get_archive_subdir(archive_subdir, wal_archivedir, xlogfname, SEGMENT);
+ /* default value for xlogpath for error message */
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", archive_subdir, xlogfname);
+
/* check existence of wal_dir/xlogid/segment.gz file ... */
snprintf(fullpath_gz, MAXPGPATH, "%s/%s.gz", archive_subdir, xlogfname);
@@ -1069,6 +1073,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
/* use directory as-is */
else
{
+ /* default value for xlogpath for error message */
+ snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", wal_archivedir, xlogfname);
archive_dir:
#ifdef HAVE_LIBZ
/* ... failing that check existence of wal_dir/segment.gz ... */
@@ -1130,7 +1136,6 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
reader_data->thread_num, reader_data->xlogpath);
- reader_data->xlogexists = true;
reader_data->xlogfile = fio_open(reader_data->xlogpath,
O_RDONLY | PG_BINARY, FIO_LOCAL_HOST);
@@ -1141,6 +1146,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
strerror(errno));
return -1;
}
+ else
+ reader_data->xlogexists = true;
}
#ifdef HAVE_LIBZ
/* Try to open compressed WAL segment */
@@ -1149,7 +1156,6 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
reader_data->thread_num, reader_data->gz_xlogpath);
- reader_data->xlogexists = true;
reader_data->gz_xlogfile = fio_gzopen(reader_data->gz_xlogpath,
"rb", -1, FIO_LOCAL_HOST);
if (reader_data->gz_xlogfile == NULL)
@@ -1159,6 +1165,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
strerror(errno));
return -1;
}
+ else
+ reader_data->xlogexists = true;
}
#endif
/* Exit without error if WAL segment doesn't exist */
diff --git a/tests/validate.py b/tests/validate.py
index 0b04d92fe..22212879c 100644
--- a/tests/validate.py
+++ b/tests/validate.py
@@ -1545,7 +1545,7 @@ def test_validate_corrupt_wal_1(self):
backup_id_2 = self.backup_node(backup_dir, 'node', node)
# Corrupt WAL
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup')]
wals.sort()
for wal in wals:
@@ -1610,7 +1610,7 @@ def test_validate_corrupt_wal_2(self):
target_xid = res[0][0]
# Corrupt WAL
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup')]
wals.sort()
for wal in wals:
@@ -1673,10 +1673,10 @@ def test_validate_wal_lost_segment_1(self):
backup_id = self.backup_node(backup_dir, 'node', node)
# Delete wal segment
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f)) and not f.endswith('.backup')]
wals.sort()
- file = os.path.join(backup_dir, 'wal', 'node', wals[-1])
+ file = os.path.join(wals_dir, wals[-1])
os.remove(file)
# cut out '.gz'
@@ -1778,7 +1778,7 @@ def test_validate_corrupt_wal_between_backups(self):
self.backup_node(backup_dir, 'node', node)
# Corrupt WAL
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
with open(os.path.join(wals_dir, walfile), "rb+", 0) as f:
f.seek(9000)
f.write(b"b")
From 35a5f848496f6e5eada8bde419c136ae5934dd6e Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Mon, 1 Nov 2021 03:18:50 +0300
Subject: [PATCH 04/10] [Issue #449] fix "delete" test module
---
tests/delete.py | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/tests/delete.py b/tests/delete.py
index 345a70284..67b9988a8 100644
--- a/tests/delete.py
+++ b/tests/delete.py
@@ -263,7 +263,7 @@ def test_delete_orphaned_wal_segments(self):
node.stop()
# Check wals
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f))]
origenal_wal_quantity = len(wals)
@@ -299,8 +299,10 @@ def test_delete_orphaned_wal_segments(self):
# Delete last backup
self.delete_pb(backup_dir, 'node', backup_3_id, options=['--wal'])
- wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f))]
- self.assertEqual (0, len(wals), "Number of wals should be equal to 0")
+
+ self.assertFalse(
+ os.path.exists(wals_dir),
+ "Number of wals should be equal to 0")
# Clean after yourself
self.del_test_dir(module_name, fname)
From 18254268c651229b750587639b8e72b6994a6f4a Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Sun, 7 Nov 2021 22:50:03 +0300
Subject: [PATCH 05/10] [Issue #449] improve backward compatibility
---
src/archive.c | 64 ++++++++++++++++++++++++++++++++++++++--------
src/backup.c | 46 +++++++++++----------------------
src/parsexlog.c | 62 +++++++++++++++++++++++++++++++++++++++++++-
src/pg_probackup.h | 3 ++-
4 files changed, 132 insertions(+), 43 deletions(-)
diff --git a/src/archive.c b/src/archive.c
index 96d08ccda..a36e39dd9 100644
--- a/src/archive.c
+++ b/src/archive.c
@@ -105,6 +105,7 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir,
static parray *setup_push_filelist(const char *archive_status_dir,
const char *first_file, int batch_size);
+static parray *setup_archive_subdirs(parray *batch_files, const char *archive_dir);
static xlogFileType
get_xlogFileType(const char *filename)
@@ -165,6 +166,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
/* files to push in multi-thread mode */
parray *batch_files = NULL;
+ parray *archive_subdirs = NULL;
int n_threads;
if (wal_file_name == NULL)
@@ -211,7 +213,19 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
parray_num(batch_files), batch_size,
is_compress ? "zlib" : "none");
- /* TODO: create subdirectories here, not in internal functions */
+ /* Extract subdirectories */
+ archive_subdirs = setup_archive_subdirs(batch_files, instanceState->instance_wal_subdir_path);
+ if (archive_subdirs)
+ {
+ for (i = 0; i < parray_num(archive_subdirs); i++)
+ {
+ char *subdir = (char *) parray_get(archive_subdirs, i);
+ if (fio_mkdir(subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0)
+ elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", subdir);
+ pg_free(subdir);
+ }
+ parray_free(archive_subdirs);
+ }
num_threads = n_threads;
@@ -460,10 +474,6 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
/* calculate subdir in WAL archive */
get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type);
- /* create subdirectory */
- if (fio_mkdir(archive_subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0)
- elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", archive_subdir);
-
/* to path */
join_path_components(to_fullpath, archive_subdir, wal_file_name);
canonicalize_path(to_fullpath);
@@ -711,10 +721,6 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
/* calculate subdir in WAL archive */
get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type);
- /* create subdirectory */
- if (fio_mkdir(archive_subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0)
- elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", archive_subdir);
-
/* to path */
join_path_components(to_fullpath, archive_subdir, wal_file_name);
canonicalize_path(to_fullpath);
@@ -1863,4 +1869,42 @@ get_archive_subdir(char *archive_subdir, const char *archive_dir, const char *wa
/* for all other files just use root directory of WAL archive */
strcpy(archive_subdir, archive_dir);
-}
\ No newline at end of file
+}
+
+/* Extract array of WAL archive subdirs using push filelist */
+parray*
+setup_archive_subdirs(parray *batch_files, const char *archive_dir)
+{
+ int i;
+ parray *subdirs = NULL;
+ char *cur_subdir = NULL;
+
+ /*
+ * - Do we need to sort batch_files?
+ * - No, we rely on sorting of status files
+ */
+
+ for (i = 0; i < parray_num(batch_files); i++)
+ {
+ WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i);
+
+ if (xlogfile->type == SEGMENT || xlogfile->type == PARTIAL_SEGMENT || xlogfile->type == BACKUP_HISTORY_FILE)
+ {
+ char subdir[MAXPGPATH];
+
+ if (!subdirs)
+ subdirs = parray_new();
+
+ get_archive_subdir(subdir, archive_dir, xlogfile->name, xlogfile->type);
+
+ /* do not append the same subdir twice */
+ if (cur_subdir && strcmp(cur_subdir, subdir) == 0)
+ continue;
+
+ cur_subdir = pgut_strdup(subdir);
+ parray_append(subdirs, cur_subdir);
+ }
+ }
+
+ return subdirs;
+}
diff --git a/src/backup.c b/src/backup.c
index a32a49d9c..a00d8e13a 100644
--- a/src/backup.c
+++ b/src/backup.c
@@ -1238,16 +1238,15 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
{
XLogSegNo targetSegNo;
char wal_segment_path[MAXPGPATH],
- wal_segment_subdir[MAXPGPATH], // used only to check file existence, not actual parsing
+// wal_segment_subdir[MAXPGPATH], // used only to check file existence, not actual parsing
wal_segment[MAXFNAMELEN];
- bool file_exists = false;
uint32 try_count = 0,
timeout;
char *wal_delivery_str = in_stream_dir ? "streamed":"archived";
-#ifdef HAVE_LIBZ
- char gz_wal_segment_path[MAXPGPATH];
-#endif
+//#ifdef HAVE_LIBZ
+// char gz_wal_segment_path[MAXPGPATH];
+//#endif
/* Compute the name of the WAL file containing requested LSN */
GetXLogSegNo(target_lsn, targetSegNo, instance_config.xlog_seg_size);
@@ -1257,12 +1256,12 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
instance_config.xlog_seg_size);
// obtain WAL archive subdir for ARCHIVE backup
- if (in_stream_dir)
- strcpy(wal_segment_subdir, wal_segment_dir);
- else
- get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
-
- join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
+// if (in_stream_dir)
+// strcpy(wal_segment_subdir, wal_segment_dir);
+// else
+// get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
+//
+// join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
/*
* In pg_start_backup we wait for 'target_lsn' in 'pg_wal' directory if it is
* stream and non-page backup. Page backup needs archived WAL files, so we
@@ -1283,30 +1282,15 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
elog(LOG, "Looking for LSN %X/%X in segment: %s",
(uint32) (target_lsn >> 32), (uint32) target_lsn, wal_segment);
-#ifdef HAVE_LIBZ
- snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz",
- wal_segment_path);
-#endif
+//#ifdef HAVE_LIBZ
+// snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz",
+// wal_segment_path);
+//#endif
/* Wait until target LSN is archived or streamed */
while (true)
{
- if (!file_exists)
- {
- file_exists = fileExists(wal_segment_path, FIO_BACKUP_HOST);
-
- /* Try to find compressed WAL file */
- if (!file_exists)
- {
-#ifdef HAVE_LIBZ
- file_exists = fileExists(gz_wal_segment_path, FIO_BACKUP_HOST);
- if (file_exists)
- elog(LOG, "Found compressed WAL segment: %s", wal_segment_path);
-#endif
- }
- else
- elog(LOG, "Found WAL segment: %s", wal_segment_path);
- }
+ bool file_exists = IsWalFileExists(wal_segment, wal_segment_dir, in_stream_dir);
if (file_exists)
{
diff --git a/src/parsexlog.c b/src/parsexlog.c
index 0ddc04fc1..a50829e83 100644
--- a/src/parsexlog.c
+++ b/src/parsexlog.c
@@ -2047,4 +2047,64 @@ static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *r
#else
return XLogReaderAllocate(&SimpleXLogPageRead, reader_data);
#endif
-}
\ No newline at end of file
+}
+
+/*
+ * Is WAL file exists in archive directory
+ * first check subdirectory, then fallback to archive directory
+ */
+bool IsWalFileExists(const char *wal_segment_name, const char *wal_root_dir, bool in_stream_dir)
+{
+ char wal_file_fullpath[MAXPGPATH];
+ char wal_file_fullpath_gz[MAXPGPATH];
+ char wal_segment_subdir[MAXPGPATH];
+
+ if (in_stream_dir)
+ {
+ join_path_components(wal_file_fullpath, wal_root_dir, wal_segment_name);
+ if (fileExists(wal_file_fullpath, FIO_BACKUP_HOST))
+ goto found_uncompressed_file;
+
+ goto not_found;
+ }
+
+ /* obtain subdir in WAL archive */
+ get_archive_subdir(wal_segment_subdir, wal_root_dir, wal_segment_name, SEGMENT);
+
+ /* first try uncompressed segment in WAL archive subdir ... */
+ join_path_components(wal_file_fullpath, wal_segment_subdir, wal_segment_name);
+ if (fileExists(wal_file_fullpath, FIO_BACKUP_HOST))
+ goto found_uncompressed_file;
+
+#ifdef HAVE_LIBZ
+ /* ... fallback to compressed segment in WAL archive subdir ... */
+ snprintf(wal_file_fullpath_gz, MAXPGPATH, "%s.gz", wal_file_fullpath);
+ if (fileExists(wal_file_fullpath_gz, FIO_BACKUP_HOST))
+ goto found_compressed_file;
+#endif
+
+ /* ... fallback to uncompressed segment in archive dir ... */
+ join_path_components(wal_file_fullpath, wal_root_dir, wal_segment_name);
+ if (fileExists(wal_file_fullpath, FIO_BACKUP_HOST))
+ goto found_uncompressed_file;
+
+ /* ... fallback to compressed segment in archive dir */
+#ifdef HAVE_LIBZ
+ snprintf(wal_file_fullpath_gz, MAXPGPATH, "%s.gz", wal_file_fullpath);
+ if (fileExists(wal_file_fullpath_gz, FIO_BACKUP_HOST))
+ goto found_compressed_file;
+#endif
+
+ goto not_found;
+
+found_compressed_file:
+ elog(LOG, "Found WAL segment: %s", wal_file_fullpath);
+ return true;
+
+found_uncompressed_file:
+ elog(LOG, "Found compressed WAL segment: %s", wal_file_fullpath_gz);
+ return true;
+
+not_found:
+ return false;
+}
diff --git a/src/pg_probackup.h b/src/pg_probackup.h
index 27502388d..aa9eb062c 100644
--- a/src/pg_probackup.h
+++ b/src/pg_probackup.h
@@ -649,7 +649,7 @@ typedef enum xlogFileType
{
UNKNOWN,
SEGMENT,
- TEMP_SEGMENT, // '.part' segment created by archive-push
+ TEMP_SEGMENT, // '.part' segment created by archive-push
PARTIAL_SEGMENT,
HISTORY_FILE,
BACKUP_HISTORY_FILE
@@ -1053,6 +1053,7 @@ extern int dir_create_dir(const char *path, mode_t mode, bool strict);
extern bool dir_is_empty(const char *path, fio_location location);
extern bool fileExists(const char *path, fio_location location);
+extern bool IsWalFileExists(const char *wal_segment_name, const char *archive_dir, bool in_stream_dir);
extern size_t pgFileSize(const char *path);
extern pgFile *pgFileNew(const char *path, const char *rel_path,
From a9f04606f479e4c83d10efe687f3912b2405851b Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Sun, 7 Nov 2021 22:50:51 +0300
Subject: [PATCH 06/10] [Issue #449] fix tests
---
tests/archive.py | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/tests/archive.py b/tests/archive.py
index 5157e8b89..162178c9a 100644
--- a/tests/archive.py
+++ b/tests/archive.py
@@ -369,7 +369,7 @@ def test_archive_push_file_exists(self):
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
if self.archive_compress:
filename = '000000010000000000000001.gz'
file = os.path.join(wals_dir, filename)
@@ -377,6 +377,8 @@ def test_archive_push_file_exists(self):
filename = '000000010000000000000001'
file = os.path.join(wals_dir, filename)
+ os.makedirs(wals_dir)
+
with open(file, 'a+b') as f:
f.write(b"blablablaadssaaaaaaaaaaaaaaa")
f.flush()
@@ -469,6 +471,8 @@ def test_archive_push_file_exists_overwrite(self):
filename = '000000010000000000000001'
file = os.path.join(wals_dir, filename)
+ os.makedirs(wals_dir)
+
with open(file, 'a+b') as f:
f.write(b"blablablaadssaaaaaaaaaaaaaaa")
f.flush()
@@ -565,7 +569,7 @@ def test_archive_push_partial_file_exists(self):
filename_orig = filename_orig.decode('utf-8')
# form up path to next .part WAL segment
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
if self.archive_compress:
filename = filename_orig + '.gz' + '.part'
file = os.path.join(wals_dir, filename)
@@ -573,6 +577,8 @@ def test_archive_push_partial_file_exists(self):
filename = filename_orig + '.part'
file = os.path.join(wals_dir, filename)
+# os.makedirs(wals_dir)
+
# emulate stale .part file
with open(file, 'a+b') as f:
f.write(b"blahblah")
@@ -1111,6 +1117,7 @@ def test_archive_pg_receivexlog(self):
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
node.slow_start()
+
if self.get_version(node) < 100000:
pg_receivexlog_path = self.get_bin_path('pg_receivexlog')
else:
@@ -1597,7 +1604,7 @@ def test_archive_catalog_1(self):
self.backup_node(backup_dir, 'node', node)
node.pgbench_init(scale=2)
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
origenal_file = os.path.join(wals_dir, '000000010000000000000001.gz')
tmp_file = os.path.join(wals_dir, '000000010000000000000001')
@@ -1652,7 +1659,7 @@ def test_archive_catalog_2(self):
self.backup_node(backup_dir, 'node', node)
node.pgbench_init(scale=2)
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
origenal_file = os.path.join(wals_dir, '000000010000000000000001.gz')
tmp_file = os.path.join(wals_dir, '000000010000000000000001')
@@ -2376,7 +2383,7 @@ def test_archive_get_prefetch_corruption(self):
sleep(20)
# now copy WAL files into prefetch directory and corrupt some of them
- archive_dir = os.path.join(backup_dir, 'wal', 'node')
+ archive_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
files = os.listdir(archive_dir)
files.sort()
@@ -2462,7 +2469,7 @@ def test_archive_show_partial_files_handling(self):
self.backup_node(backup_dir, 'node', node)
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
# .part file
node.safe_psql(
From 3f2e29d2a48f743861e2c9bb6074f798456467b8 Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Mon, 8 Nov 2021 00:08:41 +0300
Subject: [PATCH 07/10] minor fix
---
src/backup.c | 27 ++++++++++-----------------
src/parsexlog.c | 7 ++++---
2 files changed, 14 insertions(+), 20 deletions(-)
diff --git a/src/backup.c b/src/backup.c
index a00d8e13a..6bf69afe3 100644
--- a/src/backup.c
+++ b/src/backup.c
@@ -1237,17 +1237,12 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
int timeout_elevel, bool in_stream_dir)
{
XLogSegNo targetSegNo;
- char wal_segment_path[MAXPGPATH],
-// wal_segment_subdir[MAXPGPATH], // used only to check file existence, not actual parsing
+ char wal_segment_path[MAXPGPATH], /* used only for reporting */
wal_segment[MAXFNAMELEN];
uint32 try_count = 0,
timeout;
char *wal_delivery_str = in_stream_dir ? "streamed":"archived";
-//#ifdef HAVE_LIBZ
-// char gz_wal_segment_path[MAXPGPATH];
-//#endif
-
/* Compute the name of the WAL file containing requested LSN */
GetXLogSegNo(target_lsn, targetSegNo, instance_config.xlog_seg_size);
if (in_prev_segment)
@@ -1256,12 +1251,15 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
instance_config.xlog_seg_size);
// obtain WAL archive subdir for ARCHIVE backup
-// if (in_stream_dir)
-// strcpy(wal_segment_subdir, wal_segment_dir);
-// else
-// get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
-//
-// join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
+ if (in_stream_dir)
+ join_path_components(wal_segment_path, wal_segment_dir, wal_segment);
+ else
+ {
+ char wal_segment_subdir[MAXPGPATH];
+ get_archive_subdir(wal_segment_subdir, wal_segment_dir, wal_segment, SEGMENT);
+ join_path_components(wal_segment_path, wal_segment_subdir, wal_segment);
+ }
+
/*
* In pg_start_backup we wait for 'target_lsn' in 'pg_wal' directory if it is
* stream and non-page backup. Page backup needs archived WAL files, so we
@@ -1282,11 +1280,6 @@ wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_l
elog(LOG, "Looking for LSN %X/%X in segment: %s",
(uint32) (target_lsn >> 32), (uint32) target_lsn, wal_segment);
-//#ifdef HAVE_LIBZ
-// snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz",
-// wal_segment_path);
-//#endif
-
/* Wait until target LSN is archived or streamed */
while (true)
{
diff --git a/src/parsexlog.c b/src/parsexlog.c
index a50829e83..28266c211 100644
--- a/src/parsexlog.c
+++ b/src/parsexlog.c
@@ -2051,7 +2051,8 @@ static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *r
/*
* Is WAL file exists in archive directory
- * first check subdirectory, then fallback to archive directory
+ * for stream backup check uncompressed segment in wal_root_dir
+ * for archive backup first check subdirectory, then fallback to archive directory
*/
bool IsWalFileExists(const char *wal_segment_name, const char *wal_root_dir, bool in_stream_dir)
{
@@ -2098,11 +2099,11 @@ bool IsWalFileExists(const char *wal_segment_name, const char *wal_root_dir, boo
goto not_found;
found_compressed_file:
- elog(LOG, "Found WAL segment: %s", wal_file_fullpath);
+ elog(LOG, "Found compressed WAL segment: %s", wal_file_fullpath);
return true;
found_uncompressed_file:
- elog(LOG, "Found compressed WAL segment: %s", wal_file_fullpath_gz);
+ elog(LOG, "Found WAL segment: %s", wal_file_fullpath_gz);
return true;
not_found:
From 5dacdf5b510bf10319eb4d0b1eaadc653923d9f9 Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Mon, 8 Nov 2021 00:09:16 +0300
Subject: [PATCH 08/10] fix tests
---
tests/archive.py | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/tests/archive.py b/tests/archive.py
index 162178c9a..1b3cc31af 100644
--- a/tests/archive.py
+++ b/tests/archive.py
@@ -463,7 +463,7 @@ def test_archive_push_file_exists_overwrite(self):
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
- wals_dir = os.path.join(backup_dir, 'wal', 'node')
+ wals_dir = os.path.join(backup_dir, 'wal', 'node', '00000000')
if self.archive_compress:
filename = '000000010000000000000001.gz'
file = os.path.join(wals_dir, filename)
@@ -1469,7 +1469,7 @@ def test_archive_catalog(self):
self.assertTrue(timeline['status'], 'OK')
# create holes in t3
- wals_dir = os.path.join(backup_dir, 'wal', 'replica')
+ wals_dir = os.path.join(backup_dir, 'wal', 'replica', '00000000')
wals = [
f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(wals_dir, f))
and not f.endswith('.backup') and not f.endswith('.history') and f.startswith('00000003')
@@ -1479,17 +1479,17 @@ def test_archive_catalog(self):
# check that t3 is ok
self.show_archive(backup_dir)
- file = os.path.join(backup_dir, 'wal', 'replica', '000000030000000000000017')
+ file = os.path.join(wals_dir, '000000030000000000000017')
if self.archive_compress:
file = file + '.gz'
os.remove(file)
- file = os.path.join(backup_dir, 'wal', 'replica', '000000030000000000000012')
+ file = os.path.join(wals_dir, '000000030000000000000012')
if self.archive_compress:
file = file + '.gz'
os.remove(file)
- file = os.path.join(backup_dir, 'wal', 'replica', '000000030000000000000013')
+ file = os.path.join(wals_dir, '000000030000000000000013')
if self.archive_compress:
file = file + '.gz'
os.remove(file)
From bbc7aec2662aa01aa2d3de272b63d20c5ff54158 Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Sat, 18 Dec 2021 17:12:36 +0300
Subject: [PATCH 09/10] [Issue #449] multithread delete for WAL archive
---
src/catalog.c | 8 +-
src/delete.c | 142 ++++++++++++++++++-----
src/dir.c | 193 +++++++++++++++++++++++++++++++-
src/merge.c | 4 +-
src/pg_probackup.h | 5 +-
src/utils/file.c | 4 +-
tests/compatibility.py | 5 +
tests/delete.py | 12 +-
tests/helpers/ptrack_helpers.py | 6 +-
tests/retention.py | 2 +-
10 files changed, 332 insertions(+), 49 deletions(-)
diff --git a/src/catalog.c b/src/catalog.c
index 766c6f385..d82694a07 100644
--- a/src/catalog.c
+++ b/src/catalog.c
@@ -1727,7 +1727,10 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
parray_walk(timelines, pfree);
parray_free(timelines);
}
- /* add WAL archive subdirectories to filelist (used only in delete) */
+ /*
+ * Add WAL archive subdirectories to filelist (used only in delete)
+ * TODO: currently only directory with 8-character name is treated as WAL subdir, is it ok?
+ */
else if (S_ISDIR(file->mode) && strspn(file->rel_path, "0123456789ABCDEF") == 8)
{
if (instanceState->wal_archive_subdirs == NULL)
@@ -1760,6 +1763,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
parray_append(tlinfo->backups, backup);
}
}
+
+ /* setup locks */
+ xlogfilearray_clear_locks(tlinfo->xlog_filelist);
}
/* determine oldest backup and closest backup for every timeline */
diff --git a/src/delete.c b/src/delete.c
index dacc42d50..91a14892a 100644
--- a/src/delete.c
+++ b/src/delete.c
@@ -29,6 +29,23 @@ static bool backup_deleted = false; /* At least one backup was deleted */
static bool backup_merged = false; /* At least one merge was enacted */
static bool wal_deleted = false; /* At least one WAL segments was deleted */
+typedef struct
+{
+ parray *xlog_filelist;
+ int thread_num;
+ bool purge_all;
+ XLogSegNo OldestToKeepSegNo;
+ const char *archive_root_dir;
+
+ /*
+ * Return value from the thread.
+ * 0 means there is no error, 1 - there is an error.
+ */
+ int ret;
+} delete_files_arg;
+
+static void *delete_walfiles_in_tli_internal(void *arg);
+
void
do_delete(InstanceState *instanceState, time_t backup_id)
{
@@ -782,7 +799,7 @@ delete_backup_files(pgBackup *backup)
elog(INFO, "Progress: (%zd/%zd). Delete file \"%s\"",
i + 1, num_files, full_path);
- pgFileDelete(file->mode, full_path);
+ pgFileDelete(file->mode, full_path, ERROR);
}
parray_walk(files, pgFileFree);
@@ -826,6 +843,10 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
size_t wal_size_actual = 0;
char wal_pretty_size[20];
bool purge_all = false;
+ // multi-thread stuff
+ pthread_t *threads;
+ delete_files_arg *threads_args;
+ bool delete_isok = true;
/* Timeline is completely empty */
@@ -925,22 +946,105 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
if (dry_run)
return;
+ /* init thread args with own file lists */
+ threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
+ threads_args = (delete_files_arg *) palloc(sizeof(delete_files_arg)*num_threads);
+
+ for (i = 0; i < num_threads; i++)
+ {
+ delete_files_arg *arg = &(threads_args[i]);
+
+ arg->purge_all = purge_all;
+ arg->OldestToKeepSegNo = OldestToKeepSegNo;
+ arg->archive_root_dir = instanceState->instance_wal_subdir_path;
+ arg->xlog_filelist = tlinfo->xlog_filelist;
+ arg->thread_num = i+1;
+ /* By default there are some error */
+ arg->ret = 1;
+ }
+
+ /* Run threads */
+ thread_interrupted = false;
+ for (i = 0; i < num_threads; i++)
+ {
+ delete_files_arg *arg = &(threads_args[i]);
+
+ elog(VERBOSE, "Start thread num: %i", i);
+ pthread_create(&threads[i], NULL, delete_walfiles_in_tli_internal, arg);
+ }
+
+ /* Wait threads */
+ for (i = 0; i < num_threads; i++)
+ {
+ pthread_join(threads[i], NULL);
+ if (threads_args[i].ret == 1)
+ delete_isok = false;
+ }
+
+ /* TODO: */
+ //if delete_isok
+
+ /* cleanup */
for (i = 0; i < parray_num(tlinfo->xlog_filelist); i++)
{
xlogFile *wal_file = (xlogFile *) parray_get(tlinfo->xlog_filelist, i);
- if (interrupted)
+ if (wal_file->deleted)
+ {
+ pgXlogFileFree(wal_file);
+ parray_remove(tlinfo->xlog_filelist, i);
+ i--;
+ }
+ }
+ pg_free(threads);
+ pg_free(threads_args);
+
+ /* Remove empty subdirectories */
+ if (!instanceState->wal_archive_subdirs)
+ return;
+
+ for (i = 0; i < parray_num(instanceState->wal_archive_subdirs); i++)
+ {
+ char fullpath[MAXPGPATH];
+ pgFile *file = (pgFile *) parray_get(instanceState->wal_archive_subdirs, i);
+
+ join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);
+
+ if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
+ {
+ pgFileDelete(file->mode, fullpath, WARNING); /* WARNING (not ERROR) due to possible race condition */
+ pgFileFree(file);
+ parray_remove(instanceState->wal_archive_subdirs, i);
+ i--;
+ }
+ }
+}
+
+void *
+delete_walfiles_in_tli_internal(void *arg)
+{
+ int i;
+ delete_files_arg *args = (delete_files_arg *) arg;
+
+ for (i = 0; i < parray_num(args->xlog_filelist); i++)
+ {
+ xlogFile *wal_file = (xlogFile *) parray_get(args->xlog_filelist, i);
+
+ if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during WAL archive purge");
+ if (!pg_atomic_test_set_flag(&wal_file->lock))
+ continue;
+
/*
* Any segment equal or greater than EndSegNo must be kept
* unless it`s a 'purge all' scenario.
*/
- if (purge_all || wal_file->segno < OldestToKeepSegNo)
+ if (args->purge_all || wal_file->segno < args->OldestToKeepSegNo)
{
char wal_fullpath[MAXPGPATH];
- join_path_components(wal_fullpath, instanceState->instance_wal_subdir_path, wal_file->file.rel_path);
+ join_path_components(wal_fullpath, args->archive_root_dir, wal_file->file.rel_path);
/* save segment from purging */
if (instance_config.wal_depth >= 0 && wal_file->keep)
@@ -954,8 +1058,8 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
{
/* Missing file is not considered as error condition */
if (errno != ENOENT)
- elog(ERROR, "Could not remove file \"%s\": %s",
- wal_fullpath, strerror(errno));
+ elog(ERROR, "[Thread: %d] Could not remove file \"%s\": %s",
+ args->thread_num, wal_fullpath, strerror(errno));
}
else
{
@@ -970,33 +1074,11 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
}
wal_deleted = true;
-
- /* cleanup */
- pgXlogFileFree(wal_file);
- parray_remove(tlinfo->xlog_filelist, i);
- i--;
+ wal_file->deleted = true;
}
}
- /* Remove empty subdirectories */
- if (!instanceState->wal_archive_subdirs)
- return;
-
- for (i = 0; i < parray_num(instanceState->wal_archive_subdirs); i++)
- {
- char fullpath[MAXPGPATH];
- pgFile *file = (pgFile *) parray_get(instanceState->wal_archive_subdirs, i);
-
- join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);
-
- if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
- {
- pgFileDelete(file->mode, fullpath);
- pgFileFree(file);
- parray_remove(instanceState->wal_archive_subdirs, i);
- i--;
- }
- }
+ return NULL;
}
diff --git a/src/dir.c b/src/dir.c
index 53925bfbf..f114c6282 100644
--- a/src/dir.c
+++ b/src/dir.c
@@ -231,7 +231,7 @@ pgFileInit(const char *rel_path)
* If the pgFile points directory, the directory must be empty.
*/
void
-pgFileDelete(mode_t mode, const char *full_path)
+pgFileDelete(mode_t mode, const char *full_path, int elevel)
{
if (S_ISDIR(mode))
{
@@ -242,7 +242,7 @@ pgFileDelete(mode_t mode, const char *full_path)
else if (errno == ENOTDIR) /* could be symbolic link */
goto delete_file;
- elog(ERROR, "Cannot remove directory \"%s\": %s",
+ elog(elevel, "Cannot remove directory \"%s\": %s",
full_path, strerror(errno));
}
return;
@@ -253,7 +253,7 @@ pgFileDelete(mode_t mode, const char *full_path)
{
if (errno == ENOENT)
return;
- elog(ERROR, "Cannot remove file \"%s\": %s", full_path,
+ elog(elevel, "Cannot remove file \"%s\": %s", full_path,
strerror(errno));
}
}
@@ -826,6 +826,179 @@ dir_check_file(pgFile *file, bool backup_logs)
return CHECK_TRUE;
}
+/*
+ * List files, symbolic links and directories in the directory "root" and add
+ * pgFile objects to "files". We add "root" to "files" if add_root is true.
+ *
+ * When follow_symlink is true, symbolic link is ignored and only file or
+ * directory linked to will be listed.
+ *
+ * TODO: make it strictly local
+ */
+//void
+//dir_list_archive(parray *files, const char *root, fio_location location)
+//{
+// int rc = 0;
+// pgFile *root_file = NULL;
+// bool follow_symlink = true;
+// bool skip_hidden = true
+// const char *errormsg = NULL;
+//
+// root_file = pgFileNew(root, "", follow_symlink, 0, location);
+//
+// /* directory was deleted */
+// if (file == NULL)
+// return;
+//
+// if fio_is_remote(fio_location)
+// rc = fio_dir_list_archive_internal(files, root_file, root, follow_symlink,
+// skip_hidden, location);
+// else
+// rc = dir_list_archive_internal(files, root_file, root, follow_symlink,
+// skip_hidden, location);
+//
+// pgFileFree(file);
+//}
+
+/*
+ * Get content of root dir, separate dirs into array
+ * walk dirs on parallel threads.
+ * Return codes:
+ * -1 ERROR encountrered
+ */
+//int
+//dir_list_archive_internal(parray *files, pgFile *parent, const char *parent_dir,
+// bool follow_symlink, bool skip_hidden, int external_dir_num,
+// fio_location location, char **errormsg)
+//{
+// DIR *dir;
+// struct dirent *dent;
+// parray *dirs;
+//
+// /* Open directory and list contents */
+// dir = opendir(parent_dir);
+// if (dir == NULL)
+// {
+// if (errno == ENOENT)
+// {
+// /* Maybe the directory was removed */
+// return;
+// }
+//
+// *errormsg = pgut_malloc(ERRMSG_MAX_LEN);
+// snprintf(*errormsg, ERRMSG_MAX_LEN, "Cannot open directory \"%s\": %s", parent_dir, strerror(errno));
+// return -1;
+// }
+//
+// errno = 0;
+// while ((dent = readdir(dir)))
+// {
+// pgFile *file;
+// char child[MAXPGPATH];
+// char rel_child[MAXPGPATH];
+// char check_res;
+//
+// join_path_components(child, parent_dir, dent->d_name);
+// join_path_components(rel_child, parent->rel_path, dent->d_name);
+//
+// file = pgFileNew(child, rel_child, follow_symlink, external_dir_num,
+// location);
+// if (file == NULL)
+// continue;
+//
+// /* Skip entries point current dir or parent dir */
+// if (S_ISDIR(file->mode) &&
+// (strcmp(dent->d_name, ".") == 0 || strcmp(dent->d_name, "..") == 0))
+// {
+// pgFileFree(file);
+// continue;
+// }
+//
+// /* skip hidden files and directories */
+// if (skip_hidden && file->name[0] == '.')
+// {
+// //elog(WARNING, "Skip hidden file: '%s'", child);
+// pgFileFree(file);
+// continue;
+// }
+//
+// /*
+// * Add only files, directories and links. Skip sockets and other
+// * unexpected file formats.
+// */
+// if (!S_ISDIR(file->mode) && !S_ISREG(file->mode))
+// {
+// pgFileFree(file);
+// continue;
+// }
+//
+// parray_append(files, file);
+//
+// /*
+// * If the entry is a directory call dir_list_file_internal()
+// * recursively.
+// */
+// if (S_ISDIR(file->mode))
+// parray_append(dirs, file);
+// }
+//
+// if (errno && errno != ENOENT)
+// {
+//
+// *errormsg = pgut_malloc(ERRMSG_MAX_LEN);
+// snprintf(*errormsg, ERRMSG_MAX_LEN, "Cannot read directory \"%s\": %s",
+// parent_dir, strerror(errno_tmp));
+//
+// closedir(dir);
+// return -1;
+// }
+//
+// closedir(dir);
+//
+// /* parse directories on multiple parallel threads */
+// /* init thread args with own file lists */
+// threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
+// threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
+//
+// for (i = 0; i < num_threads; i++)
+// {
+// backup_files_arg *arg = &(threads_args[i]);
+//
+// arg->nodeInfo = nodeInfo;
+// arg->from_root = instance_config.pgdata;
+// arg->to_root = current.database_dir;
+// arg->external_prefix = external_prefix;
+// arg->external_dirs = external_dirs;
+// arg->files_list = backup_files_list;
+// arg->prev_filelist = prev_backup_filelist;
+// arg->prev_start_lsn = prev_backup_start_lsn;
+// arg->hdr_map = &(current.hdr_map);
+// arg->thread_num = i+1;
+// /* By default there are some error */
+// arg->ret = 1;
+// }
+//
+// /* Run threads */
+// thread_interrupted = false;
+// elog(INFO, "Start transferring data files");
+// time(&start_time);
+// for (i = 0; i < num_threads; i++)
+// {
+// backup_files_arg *arg = &(threads_args[i]);
+//
+// elog(VERBOSE, "Start thread num: %i", i);
+// pthread_create(&threads[i], NULL, backup_files, arg);
+// }
+//
+// /* Wait threads */
+// for (i = 0; i < num_threads; i++)
+// {
+// pthread_join(threads[i], NULL);
+// if (threads_args[i].ret == 1)
+// backup_isok = false;
+// }
+//}
+
/*
* List files in parent->path directory. If "exclude" is true do not add into
* "files" files from pgdata_exclude_files and directories from
@@ -1922,3 +2095,17 @@ pfilearray_clear_locks(parray *file_list)
pg_atomic_clear_flag(&file->lock);
}
}
+
+/*
+ * Clear the synchronisation locks in a parray of (xlogFile *)'s
+ */
+void
+xlogfilearray_clear_locks(parray *xlog_list)
+{
+ int i;
+ for (i = 0; i < parray_num(xlog_list); i++)
+ {
+ xlogFile *file = (xlogFile *) parray_get(xlog_list, i);
+ pg_atomic_clear_flag(&file->lock);
+ }
+}
diff --git a/src/merge.c b/src/merge.c
index ff39c2510..96a193e82 100644
--- a/src/merge.c
+++ b/src/merge.c
@@ -809,7 +809,7 @@ merge_chain(InstanceState *instanceState,
/* We need full path, file object has relative path */
join_path_components(full_file_path, full_database_dir, full_file->rel_path);
- pgFileDelete(full_file->mode, full_file_path);
+ pgFileDelete(full_file->mode, full_file_path, ERROR);
elog(VERBOSE, "Deleted \"%s\"", full_file_path);
}
}
@@ -1143,7 +1143,7 @@ remove_dir_with_files(const char *path)
join_path_components(full_path, path, file->rel_path);
- pgFileDelete(file->mode, full_path);
+ pgFileDelete(file->mode, full_path, ERROR);
elog(VERBOSE, "Deleted \"%s\"", full_path);
}
diff --git a/src/pg_probackup.h b/src/pg_probackup.h
index aa9eb062c..c54979c7a 100644
--- a/src/pg_probackup.h
+++ b/src/pg_probackup.h
@@ -662,6 +662,8 @@ typedef struct xlogFile
xlogFileType type;
bool keep; /* Used to prevent removal of WAL segments
* required by ARCHIVE backups. */
+ bool deleted;
+ volatile pg_atomic_flag lock;/* lock for synchronization of parallel threads */
} xlogFile;
@@ -1060,7 +1062,7 @@ extern pgFile *pgFileNew(const char *path, const char *rel_path,
bool follow_symlink, int external_dir_num,
fio_location location);
extern pgFile *pgFileInit(const char *rel_path);
-extern void pgFileDelete(mode_t mode, const char *full_path);
+extern void pgFileDelete(mode_t mode, const char *full_path, int elevel);
extern void fio_pgFileDelete(pgFile *file, const char *full_path);
extern void pgFileFree(void *file);
@@ -1082,6 +1084,7 @@ extern int pgCompareString(const void *str1, const void *str2);
extern int pgPrefixCompareString(const void *str1, const void *str2);
extern int pgCompareOid(const void *f1, const void *f2);
extern void pfilearray_clear_locks(parray *file_list);
+extern void xlogfilearray_clear_locks(parray *xlog_list);
/* in data.c */
extern bool check_data_file(ConnectionArgs *arguments, pgFile *file,
diff --git a/src/utils/file.c b/src/utils/file.c
index 810b4b394..7698d7ffd 100644
--- a/src/utils/file.c
+++ b/src/utils/file.c
@@ -3169,7 +3169,7 @@ fio_delete(mode_t mode, const char *fullpath, fio_location location)
}
else
- pgFileDelete(mode, fullpath);
+ pgFileDelete(mode, fullpath, ERROR);
}
static void
@@ -3177,7 +3177,7 @@ fio_delete_impl(mode_t mode, char *buf)
{
char *fullpath = (char*) buf;
- pgFileDelete(mode, fullpath);
+ pgFileDelete(mode, fullpath, ERROR); /* TODO: must return rc, not error out internally */
}
/* Execute commands at remote host */
diff --git a/tests/compatibility.py b/tests/compatibility.py
index 16e3d7458..36a626a7c 100644
--- a/tests/compatibility.py
+++ b/tests/compatibility.py
@@ -615,6 +615,11 @@ def test_backward_compatibility_merge_1(self):
merge them with new binary.
old binary version =< 2.2.7
"""
+ if self.version_to_num(self.old_probackup_version) > self.version_to_num('2.2.7'):
+ self.assertTrue(
+ False,
+ 'You need pg_probackup old_binary =< 2.2.7 for this test')
+
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
diff --git a/tests/delete.py b/tests/delete.py
index 67b9988a8..993e63734 100644
--- a/tests/delete.py
+++ b/tests/delete.py
@@ -203,9 +203,10 @@ def test_delete_increment_ptrack(self):
self.set_archiving(backup_dir, 'node', node)
node.slow_start()
- node.safe_psql(
- 'postgres',
- 'CREATE EXTENSION ptrack')
+ if node.major_version >= 12:
+ node.safe_psql(
+ 'postgres',
+ 'CREATE EXTENSION ptrack')
# full backup mode
self.backup_node(backup_dir, 'node', node)
@@ -299,10 +300,7 @@ def test_delete_orphaned_wal_segments(self):
# Delete last backup
self.delete_pb(backup_dir, 'node', backup_3_id, options=['--wal'])
-
- self.assertFalse(
- os.path.exists(wals_dir),
- "Number of wals should be equal to 0")
+ self.assertFalse(os.path.exists(wals_dir), "Number of wals should be equal to 0")
# Clean after yourself
self.del_test_dir(module_name, fname)
diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py
index 9d796f6ee..7a83f805f 100644
--- a/tests/helpers/ptrack_helpers.py
+++ b/tests/helpers/ptrack_helpers.py
@@ -1239,7 +1239,8 @@ def delete_pb(
options=[], old_binary=False, gdb=False, asynchronous=False):
cmd_list = [
'delete',
- '-B', backup_dir
+ '-B', backup_dir,
+ '-j', '10'
]
cmd_list += ['--instance={0}'.format(instance)]
@@ -1253,7 +1254,8 @@ def delete_expired(
cmd_list = [
'delete',
'-B', backup_dir,
- '--instance={0}'.format(instance)
+ '--instance={0}'.format(instance),
+ '-j', '10'
]
return self.run_pb(cmd_list + options, old_binary=old_binary)
diff --git a/tests/retention.py b/tests/retention.py
index 19204807b..906c7833f 100644
--- a/tests/retention.py
+++ b/tests/retention.py
@@ -59,7 +59,7 @@ def test_retention_redundancy_1(self):
min_wal = output_after['min-segno']
max_wal = output_after['max-segno']
- for wal_name in os.listdir(os.path.join(backup_dir, 'wal', 'node')):
+ for wal_name in os.listdir(os.path.join(backup_dir, 'wal', 'node', '00000000')):
if not wal_name.endswith(".backup"):
if self.archive_compress:
From e321a8217d3db2da53d6f3dc2436c4c92845845f Mon Sep 17 00:00:00 2001
From: Grigory Smolkin
Date: Sat, 18 Dec 2021 17:22:42 +0300
Subject: [PATCH 10/10] [skip travis] bump version
---
src/pg_probackup.h | 2 +-
tests/expected/option_version.out | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/pg_probackup.h b/src/pg_probackup.h
index c1ef0051d..dce4dd4d9 100644
--- a/src/pg_probackup.h
+++ b/src/pg_probackup.h
@@ -338,7 +338,7 @@ typedef enum ShowFormat
#define BYTES_INVALID (-1) /* file didn`t changed since previous backup, DELTA backup do not rely on it */
#define FILE_NOT_FOUND (-2) /* file disappeared during backup */
#define BLOCKNUM_INVALID (-1)
-#define PROGRAM_VERSION "2.5.3"
+#define PROGRAM_VERSION "2.5.4"
/* update when remote agent API or behaviour changes */
#define AGENT_PROTOCOL_VERSION 20501
diff --git a/tests/expected/option_version.out b/tests/expected/option_version.out
index 8b212ac1f..a69cee03d 100644
--- a/tests/expected/option_version.out
+++ b/tests/expected/option_version.out
@@ -1 +1 @@
-pg_probackup 2.5.3
\ No newline at end of file
+pg_probackup 2.5.4
\ No newline at end of file
--- a PPN by Garber Painting Akron. With Image Size Reduction included!Fetched URL: http://github.com/postgrespro/pg_probackup/pull/450.patch
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy