Skip to content

Commit

Permalink
code changes to run historical sync when event update date is older t…
Browse files Browse the repository at this point in the history
…han 30 days
  • Loading branch information
rdeshmukh15 committed May 6, 2023
1 parent 3538fab commit d0e038b
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions tap_stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,11 +988,14 @@ def sync_event_updates(stream_name, is_sub_stream):
bookmark_value = parent_bookmark_value

# Start sync for event updates record from the last 30 days before if bookmark/start_date is older than 30 days.
max_created = int(max(bookmark_value, (epoch_to_dt(sync_start_time) - timedelta(days=30)).timestamp()))
if max_created != bookmark_value:
LOGGER.warning("Provided start_date or current bookmark for event updates is older than 30 days.")
LOGGER.warning("The Stripe Event API returns data for the last 30 days only. So, syncing event data from 30 days only.")
max_event_start_date = (epoch_to_dt(sync_start_time) - timedelta(days=30)).timestamp()
max_created = int(max(bookmark_value, max_event_start_date))
start_date = int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

if max_created != bookmark_value and bookmark_value != start_date:
reset_bookmark_for_event_updates(is_sub_stream, stream_name, sub_stream_name, start_date, max_event_start_date)
raise Exception("Provided current bookmark date for event updates is older than 30 days. Hence, resetting the bookmark date of respective parent/child stream to start date.")

date_window_start = max_created
date_window_end = max_created + events_update_date_window_size
stop_paging = False
Expand Down Expand Up @@ -1114,6 +1117,28 @@ def write_bookmark_for_event_updates(is_sub_stream, stream_name, sub_stream_name

singer.write_state(Context.state)

def reset_bookmark_for_event_updates(is_sub_stream, stream_name, sub_stream_name, start_date, max_event_start_date):
"""
Reset bookmark for parent and child streams to start date and clear the bookmark date for event updates.
"""
# Write the parent bookmark value only when the parent is selected
if not is_sub_stream:
singer.write_bookmark(Context.state,
stream_name,
STREAM_REPLICATION_KEY.get(stream_name),
start_date)
Context.state.get("bookmarks").pop(stream_name + '_events', None)

# Write the child bookmark value only when the child is selected
if sub_stream_name and Context.is_selected(sub_stream_name):
singer.write_bookmark(Context.state,
sub_stream_name,
STREAM_REPLICATION_KEY.get(sub_stream_name),
start_date)
Context.state.get("bookmarks").pop(sub_stream_name + '_events', None)

singer.write_state(Context.state)


def sync():
"""
Expand Down

0 comments on commit d0e038b

Please sign in to comment.