Skip to content

Commit

Permalink
Merge pull request #552 from aiven/alex-fix-hardcoded-wal-type
Browse files Browse the repository at this point in the history
Fix handling of compressed timeline files on startup [BF-1391]

v
  • Loading branch information
kmichel-aiven authored Aug 19, 2022
2 parents f0d3221 + 7e713c6 commit 78ce24f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
6 changes: 4 additions & 2 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,13 @@ def startup_walk_for_missed_files(self):
with open(metadata_path, "r") as fp:
metadata = json.load(fp)

file_type = FileType.Wal if is_xlog else FileType.Timeline

transfer_event = UploadEvent(
file_type=FileType.Wal,
file_type=file_type,
backup_site_name=site,
file_size=os.path.getsize(full_path),
file_path=FileTypePrefixes[FileType.Wal] / filename,
file_path=FileTypePrefixes[file_type] / filename,
source_data=Path(full_path),
callback_queue=None,
metadata=metadata
Expand Down
33 changes: 21 additions & 12 deletions test/test_pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,27 +529,34 @@ def test_startup_walk_for_missed_uncompressed_files(self):
assert self.pghoard.compression_queue.qsize() == 2
assert self.pghoard.transfer_queue.qsize() == 0

def test_startup_walk_for_missed_uncompressed_files_timeline(self):
@pytest.mark.parametrize(
"file_type, file_name", [(FileType.Wal, "000000010000000000000004"), (FileType.Timeline, "00000002.history")]
)
def test_startup_walk_for_missed_uncompressed_file_type(self, file_type: FileType, file_name: str):
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
uncompressed_wal_path = compressed_wal_path + "_incoming"
with open(os.path.join(uncompressed_wal_path, "00000002.history"), "wb") as fp:
with open(os.path.join(uncompressed_wal_path, file_name), "wb") as fp:
fp.write(b"foo")
self.pghoard.startup_walk_for_missed_files()
assert self.pghoard.compression_queue.qsize() == 1
assert self.pghoard.transfer_queue.qsize() == 0
compress_event = self.pghoard.compression_queue.get(timeout=1.0)
assert compress_event.file_type == FileType.Timeline
assert compress_event.file_type == file_type

def test_startup_walk_for_missed_uncompressed_files_wal(self):
@pytest.mark.parametrize(
"file_type, file_name", [(FileType.Wal, "000000010000000000000005"), (FileType.Timeline, "00000003.history")]
)
def test_startup_walk_for_missed_compressed_file_type(self, file_type: FileType, file_name: str):
compressed_wal_path, _ = self.pghoard.create_backup_site_paths(self.test_site)
uncompressed_wal_path = compressed_wal_path + "_incoming"
with open(os.path.join(uncompressed_wal_path, "000000010000000000000004"), "wb") as fp:
with open(os.path.join(compressed_wal_path, file_name), "wb") as fp:
fp.write(b"foo")
with open(os.path.join(compressed_wal_path, f"{file_name}.metadata"), "wb") as fp:
fp.write(b"{}")
self.pghoard.startup_walk_for_missed_files()
assert self.pghoard.compression_queue.qsize() == 1
assert self.pghoard.transfer_queue.qsize() == 0
compress_event = self.pghoard.compression_queue.get(timeout=1.0)
assert compress_event.file_type == FileType.Wal
assert self.pghoard.compression_queue.qsize() == 0
assert self.pghoard.transfer_queue.qsize() == 1
upload_event = self.pghoard.transfer_queue.get(timeout=1.0)
assert upload_event.file_type == file_type


class TestPGHoardWithPG:
Expand Down Expand Up @@ -597,8 +604,6 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume, caplog):
# MiB so if logic for automatically suspending pg_receive(xlog|wal) wasn't working the volume
# would certainly fill up and the files couldn't be processed. Now this should work fine.
for _ in range(16):
# Note: do not combine two function call in one select, PG executes it differently and
# sometimes looks like it generates less WAL files than we wanted
switch_wal(conn)
conn.close()

Expand All @@ -625,6 +630,10 @@ def test_surviving_pg_receivewal_hickup(self, db, pghoard):
if pghoard.receivexlogs[pghoard.test_site].is_alive():
pghoard.receivexlogs[pghoard.test_site].join()
del pghoard.receivexlogs[pghoard.test_site]
# stopping the thread is not enough, it's possible that killed receiver will leave incomplete partial files
# around, pghoard is capable of cleaning those up but needs to be restarted, for the test it should be OK
# just to call startup_walk_for_missed_files, so it takes care of cleaning up
pghoard.startup_walk_for_missed_files()

n_xlogs = pghoard.transfer_agent_state[pghoard.test_site]["upload"]["xlog"]["xlogs_since_basebackup"]

Expand Down
7 changes: 7 additions & 0 deletions test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ def switch_wal(connection):
cur = connection.cursor()
# Force allocating a XID, otherwise if there was no activity we will
# stay on the same WAL
# Note: do not combine two function call in one select, PG executes it differently and
# sometimes looks like it generates less WAL files than we wanted
cur.execute("SELECT txid_current()")
if connection.server_version >= 100000:
cur.execute("SELECT pg_switch_wal()")
else:
cur.execute("SELECT pg_switch_xlog()")
# This should fix flaky tests, which expect a specific number of WAL files which never arrive.
# Quite often the last WAL would not be finalized by walreceiver unless there is some extra activity after
# switching, the bug should be fixed in PG 15
# https://github.com/postgres/postgres/commit/596ba75cb11173a528c6b6ec0142a282e42b69ec
cur.execute("SELECT txid_current()")
cur.close()

0 comments on commit 78ce24f

Please sign in to comment.