Skip to content

Commit

Permalink
Add WAL synchronization wait before replica check
Browse files Browse the repository at this point in the history
This commit introduces a new feature to ensure WAL (Write-Ahead Log) synchronization
between the primary and standby before performing the replica check. Key changes include:

1.Add a new method wait_for_wal_sync() to poll and wait for WAL sync
2.Implement WAL sync check using pg_current_wal_lsn() and pg_stat_replication
Call wait_for_wal_sync() before running the replica check

This enhancement improves the reliability of the replica check by ensuring
that all changes have been replicated before comparison, reducing false
positives due to replication lag.
  • Loading branch information
yjhjstz authored and my-ship-it committed Oct 10, 2024
1 parent f26f4ab commit 6687050
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions gpcontrib/gp_replica_check/gp_replica_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import subprocess
import threading
import pipes # for shell-quoting, pipes.quote()
import time

class ReplicaCheck(threading.Thread):
def __init__(self, segrow, datname, relation_types):
Expand All @@ -58,7 +59,50 @@ def __str__(self):
Mirror Data Directory Location: %s' % (self.getName(), self.host, self.port, self.datname,
self.ploc, self.mloc)

def wait_for_wal_sync(self):
cmd = "PGOPTIONS='-c gp_role=utility' psql -h %s -p %s -d %s -t -A -c \"SELECT pg_current_wal_lsn() AS master_wal, replay_lsn AS standby_wal, pg_current_wal_lsn() = replay_lsn AS are_equal FROM pg_stat_replication;\"" % (self.host, self.port, pipes.quote(self.datname))
while True:
try:
output = subprocess.check_output(cmd, shell=True).decode().strip().split("\n")
print(f"Debug - Full output: {output}") # Debug print

if not output:
print("No output received from psql command.")
time.sleep(5)
continue

# With -t and -A options, we should get only one line of data
data_row = output[0].split("|")
if len(data_row) != 3:
print(f"Unexpected data row format. Data row: {data_row}")
time.sleep(5)
continue

master_wal = data_row[0].strip()
standby_wal = data_row[1].strip()
are_equal = data_row[2].strip().lower() == "t"

print(f"Debug - Parsed values: master_wal={master_wal}, standby_wal={standby_wal}, are_equal={are_equal}") # Debug print

if are_equal:
print("WAL sync achieved.")
break
else:
print(f"Waiting for WAL sync. Current status: master={master_wal}, standby={standby_wal}")
except subprocess.CalledProcessError as e:
with self.lock:
print(f"Error executing command: {e.cmd}")
print(f"Return code: {e.returncode}")
print(f"Output: {e.output}")
except Exception as e:
print(f"Unexpected error in wait_for_wal_sync: {str(e)}")

# Add a small delay before the next attempt
time.sleep(5)


def run(self):
self.wait_for_wal_sync();
cmd = '''PGOPTIONS='-c gp_role=utility' psql -h %s -p %s -c "select * from gp_replica_check('%s', '%s', '%s')" %s''' % (self.host, self.port,
self.ploc, self.mloc,
self.relation_types,
Expand Down

0 comments on commit 6687050

Please sign in to comment.