From 92bcb1566b7529cf6463cbc4c2544e89ce69e783 Mon Sep 17 00:00:00 2001 From: Louis Pieterse <45560107+louis-pie@users.noreply.github.com> Date: Fri, 11 Oct 2019 15:15:56 +0100 Subject: [PATCH] remove LOG_BASED stream bookmarks from state if it has been de-selected (#28) --- tap_postgres/__init__.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 24524e37..1d0b2a13 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -586,6 +586,23 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil LOGGER.info("Pure Logical Replication upto lsn %s for (%s)", end_lsn, list(map(lambda s: s['tap_stream_id'], logical_streams))) logical_streams = list(map(lambda s: logical_replication.add_automatic_properties(s, conn_config), logical_streams)) + # Remove LOG_BASED stream bookmarks from state if it has been de-selected + # This is to avoid sending very old starting and flushing positions to source + selected_streams = set() + for s in logical_streams: + selected_streams.add("{}".format(s['tap_stream_id'])) + + new_state = dict(currently_syncing = state['currently_syncing'], bookmarks = {}) + + for stream, bookmark in state['bookmarks'].items(): + if bookmark == {}: + new_state['bookmarks'][stream] = bookmark + elif (bookmark['last_replication_method'] != 'LOG_BASED'): + new_state['bookmarks'][stream] = bookmark + elif stream in selected_streams: + new_state['bookmarks'][stream] = bookmark + state = new_state + state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file) return state @@ -686,8 +703,8 @@ def parse_args(required_config_keys): Parses the command-line arguments mentioned in the SPEC and the BEST_PRACTICES documents: - -c,--config Config file - -s,--state State file + -c,--config config file + -s,--state state file -d,--discover Run in discover mode -p,--properties Properties file: DEPRECATED, please use --catalog instead --catalog Catalog file @@ -705,7 +722,7 @@ def parse_args(required_config_keys): parser.add_argument( '-s', '--state', - help='State file') + help='state file') parser.add_argument( '-p', '--properties',