Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
remove LOG_BASED stream bookmarks from state if it has been de-select…
Browse files Browse the repository at this point in the history
…ed (#28)
  • Loading branch information
louis-pie authored Oct 11, 2019
1 parent ddc46ce commit 92bcb15
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,23 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil
LOGGER.info("Pure Logical Replication upto lsn %s for (%s)", end_lsn, list(map(lambda s: s['tap_stream_id'], logical_streams)))
logical_streams = list(map(lambda s: logical_replication.add_automatic_properties(s, conn_config), logical_streams))

# Remove LOG_BASED stream bookmarks from state if it has been de-selected
# This is to avoid sending very old starting and flushing positions to source
selected_streams = set()
for s in logical_streams:
selected_streams.add("{}".format(s['tap_stream_id']))

new_state = dict(currently_syncing = state['currently_syncing'], bookmarks = {})

for stream, bookmark in state['bookmarks'].items():
if bookmark == {}:
new_state['bookmarks'][stream] = bookmark
elif (bookmark['last_replication_method'] != 'LOG_BASED'):
new_state['bookmarks'][stream] = bookmark
elif stream in selected_streams:
new_state['bookmarks'][stream] = bookmark
state = new_state

state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file)

return state
Expand Down Expand Up @@ -686,8 +703,8 @@ def parse_args(required_config_keys):
Parses the command-line arguments mentioned in the SPEC and the
BEST_PRACTICES documents:
-c,--config Config file
-s,--state State file
-c,--config config file
-s,--state state file
-d,--discover Run in discover mode
-p,--properties Properties file: DEPRECATED, please use --catalog instead
--catalog Catalog file
Expand All @@ -705,7 +722,7 @@ def parse_args(required_config_keys):

parser.add_argument(
'-s', '--state',
help='State file')
help='state file')

parser.add_argument(
'-p', '--properties',
Expand Down

0 comments on commit 92bcb15

Please sign in to comment.