Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
handle error if state file not available (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
louis-pie authored Oct 18, 2019
1 parent ca03971 commit 69e7395
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name='pipelinewise-tap-postgres',
version='1.1.8',
version='1.1.9',
description='Singer.io tap for extracting data from PostgreSQL - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
27 changes: 18 additions & 9 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ def locate_replication_slot(conn_info):


def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
lsn_comitted = min([get_bookmark(state, s['tap_stream_id'], 'lsn') for s in logical_streams])
state_comitted = state
lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
start_lsn = lsn_comitted
lsn_to_flush = None
time_extracted = utils.now()
Expand Down Expand Up @@ -451,7 +452,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
lsn_received_timestamp = datetime.datetime.utcnow()
lsn_processed_count = lsn_processed_count + 1
if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD:
LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
# LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
for s in logical_streams:
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
Expand All @@ -468,13 +469,21 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
cur.send_feedback()
else:
# Read lsn_comitted currently captured in state file on disk
lsn_comitted = min([get_bookmark(utils.load_json(state_file), s['tap_stream_id'], 'lsn') for s in logical_streams])
lsn_to_flush = lsn_comitted
if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing
LOGGER.info("{} : Confirming write up to {}, flush to {} (last message received was {} at {})".format(
datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True)
# Read lsn_comitted from state.json and feeback to source server
try:
state_comitted_file = open(state_file)
except:
LOGGER.warning("{} : Unable to open {}".format(datetime.datetime.utcnow(), state_file))
else:
state_comitted = json.load(state_comitted_file)
finally:
lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
lsn_to_flush = lsn_comitted
if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing
LOGGER.info("{} : Confirming write up to {}, flush to {} (last message received was {} at {})".format(
datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_last_processed), lsn_received_timestamp))
cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True)

poll_timestamp = datetime.datetime.utcnow()

# Close replication connection and cursor
Expand Down

0 comments on commit 69e7395

Please sign in to comment.