From d8a089438c88104af1527c2b03d0fd2601291387 Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Thu, 18 Aug 2022 10:52:15 +0200 Subject: [PATCH 1/4] Fix handling of compressed timeline files on startup Removed hardcoded WAL file type from upload event. --- pghoard/pghoard.py | 6 ++++-- test/test_pghoard.py | 27 +++++++++++++++++---------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/pghoard/pghoard.py b/pghoard/pghoard.py index 3e5b9c75..13c439d7 100644 --- a/pghoard/pghoard.py +++ b/pghoard/pghoard.py @@ -601,11 +601,13 @@ def startup_walk_for_missed_files(self): with open(metadata_path, "r") as fp: metadata = json.load(fp) + file_type = FileType.Wal if is_xlog else FileType.Timeline + transfer_event = UploadEvent( - file_type=FileType.Wal, + file_type=file_type, backup_site_name=site, file_size=os.path.getsize(full_path), - file_path=FileTypePrefixes[FileType.Wal] / filename, + file_path=FileTypePrefixes[file_type] / filename, source_data=Path(full_path), callback_queue=None, metadata=metadata diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 8cc20bc5..69788ae8 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -529,27 +529,34 @@ def test_startup_walk_for_missed_uncompressed_files(self): assert self.pghoard.compression_queue.qsize() == 2 assert self.pghoard.transfer_queue.qsize() == 0 - def test_startup_walk_for_missed_uncompressed_files_timeline(self): + @pytest.mark.parametrize( + "file_type, file_name", [(FileType.Wal, "000000010000000000000004"), (FileType.Timeline, "00000002.history")] + ) + def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileType, file_name: str): compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) uncompressed_wal_path = compressed_wal_path + "_incoming" - with open(os.path.join(uncompressed_wal_path, "00000002.history"), "wb") as fp: + with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp: fp.write(b"foo") self.pghoard.startup_walk_for_missed_files() assert self.pghoard.compression_queue.qsize() == 1 assert self.pghoard.transfer_queue.qsize() == 0 compress_event = self.pghoard.compression_queue.get(timeout=1.0) - assert compress_event.file_type == FileType.Timeline + assert compress_event.file_type == file_type - def test_startup_walk_for_missed_uncompressed_files_wal(self): + @pytest.mark.parametrize( + "file_type, file_name", [(FileType.Wal, "000000010000000000000005"), (FileType.Timeline, "00000003.history")] + ) + def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, file_name: str): compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site) - uncompressed_wal_path = compressed_wal_path + "_incoming" - with open(os.path.join(uncompressed_wal_path, "000000010000000000000004"), "wb") as fp: + with open(os.path.join(compressed_wal_path, file_name), "wb") as fp: fp.write(b"foo") + with open(os.path.join(compressed_wal_path, f"{file_name}.metadata"), "wb") as fp: + fp.write(b"{}") self.pghoard.startup_walk_for_missed_files() - assert self.pghoard.compression_queue.qsize() == 1 - assert self.pghoard.transfer_queue.qsize() == 0 - compress_event = self.pghoard.compression_queue.get(timeout=1.0) - assert compress_event.file_type == FileType.Wal + assert self.pghoard.compression_queue.qsize() == 0 + assert self.pghoard.transfer_queue.qsize() == 1 + upload_event = self.pghoard.transfer_queue.get(timeout=1.0) + assert upload_event.file_type == file_type class TestPGHoardWithPG: From 2610a301579f1960fa62146b770bb675610b47ff Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Thu, 18 Aug 2022 16:42:59 +0200 Subject: [PATCH 2/4] Fix for receivewal hickup test When test stops receivexlogs thread, the subprocess of receivexlog after being killed might leave incomplete .partial files, which make the restarted receivexlog stop working. Usually those incomplete files are truncated by pghoard after restart, so this workaround should just make test less flaky. Ideally the receivexlogs thread should do that cleanup in case of this kind of problems. --- test/test_pghoard.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 69788ae8..968aa6d8 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -632,6 +632,10 @@ def test_surviving_pg_receivewal_hickup(self, db, pghoard): if pghoard.receivexlogs[pghoard.test_site].is_alive(): pghoard.receivexlogs[pghoard.test_site].join() del pghoard.receivexlogs[pghoard.test_site] + # stopping the thread is not enough, it's possible that killed receiver will leave incomplete partial files + # around, pghoard is capable of cleaning those up but needs to be restarted, for the test it should be OK + # just to call startup_walk_for_missed_files, so it takes care of cleaning up + pghoard.startup_walk_for_missed_files() n_xlogs = pghoard.transfer_agent_state[pghoard.test_site]["upload"]["xlog"]["xlogs_since_basebackup"] From 55a4886d8e88252368d98257f1e297e7c6864a31 Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Thu, 18 Aug 2022 17:28:07 +0200 Subject: [PATCH 3/4] Move the comment to a proper place --- test/test_pghoard.py | 2 -- test/util.py | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_pghoard.py b/test/test_pghoard.py index 968aa6d8..3bd5a101 100644 --- a/test/test_pghoard.py +++ b/test/test_pghoard.py @@ -604,8 +604,6 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume, caplog): # MiB so if logic for automatically suspending pg_receive(xlog|wal) wasn't working the volume # would certainly fill up and the files couldn't be processed. Now this should work fine. for _ in range(16): - # Note: do not combine two function call in one select, PG executes it differently and - # sometimes looks like it generates less WAL files than we wanted switch_wal(conn) conn.close() diff --git a/test/util.py b/test/util.py index 39939647..d3f0df2e 100644 --- a/test/util.py +++ b/test/util.py @@ -24,6 +24,8 @@ def switch_wal(connection): cur = connection.cursor() # Force allocating a XID, otherwise if there was no activity we will # stay on the same WAL + # Note: do not combine two function call in one select, PG executes it differently and + # sometimes looks like it generates less WAL files than we wanted cur.execute("SELECT txid_current()") if connection.server_version >= 100000: cur.execute("SELECT pg_switch_wal()") From 7e713c6c0b2ab89cdafb4b428ec248fa7556b15b Mon Sep 17 00:00:00 2001 From: Alexander Olekhnovich Date: Fri, 19 Aug 2022 09:39:05 +0200 Subject: [PATCH 4/4] Fix for flaky test_pause_on_disk_full --- test/util.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/util.py b/test/util.py index d3f0df2e..7c49e3a9 100644 --- a/test/util.py +++ b/test/util.py @@ -31,4 +31,9 @@ def switch_wal(connection): cur.execute("SELECT pg_switch_wal()") else: cur.execute("SELECT pg_switch_xlog()") + # This should fix flaky tests, which expect a specific number of WAL files which never arrive. + # Quite often the last WAL would not be finalized by walreceiver unless there is some extra activity after + # switching, the bug should be fixed in PG 15 + # https://github.com/postgres/postgres/commit/596ba75cb11173a528c6b6ec0142a282e42b69ec + cur.execute("SELECT txid_current()") cur.close()