From 69e73954f0f116ef51353a919e9184c4f5d83b25 Mon Sep 17 00:00:00 2001 From: Louis Pieterse <45560107+louis-pie@users.noreply.github.com> Date: Fri, 18 Oct 2019 12:25:04 +0100 Subject: [PATCH] handle error if state file not available (#30) --- setup.py | 2 +- .../sync_strategies/logical_replication.py | 27 ++++++++++++------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/setup.py b/setup.py index a2269f61..1e2cab6d 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name='pipelinewise-tap-postgres', - version='1.1.8', + version='1.1.9', description='Singer.io tap for extracting data from PostgreSQL - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index c0333301..b8bbf568 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -374,7 +374,8 @@ def locate_replication_slot(conn_info): def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): - lsn_comitted = min([get_bookmark(state, s['tap_stream_id'], 'lsn') for s in logical_streams]) + state_comitted = state + lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) start_lsn = lsn_comitted lsn_to_flush = None time_extracted = utils.now() @@ -451,7 +452,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): lsn_received_timestamp = datetime.datetime.utcnow() lsn_processed_count = lsn_processed_count + 1 if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD: - LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed))) + # LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed))) for s in logical_streams: state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed) singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) @@ -468,13 +469,21 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed), lsn_received_timestamp)) cur.send_feedback() else: - # Read lsn_comitted currently captured in state file on disk - lsn_comitted = min([get_bookmark(utils.load_json(state_file), s['tap_stream_id'], 'lsn') for s in logical_streams]) - lsn_to_flush = lsn_comitted - if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing - LOGGER.info("{} : Confirming write up to {}, flush to {} (last message received was {} at {})".format( - datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_last_processed), lsn_received_timestamp)) - cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True) + # Read lsn_comitted from state.json and feeback to source server + try: + state_comitted_file = open(state_file) + except: + LOGGER.warning("{} : Unable to open {}".format(datetime.datetime.utcnow(), state_file)) + else: + state_comitted = json.load(state_comitted_file) + finally: + lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) + lsn_to_flush = lsn_comitted + if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing + LOGGER.info("{} : Confirming write up to {}, flush to {} (last message received was {} at {})".format( + datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_last_processed), lsn_received_timestamp)) + cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True) + poll_timestamp = datetime.datetime.utcnow() # Close replication connection and cursor