Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19384 optimize logic for parent child relationship to be independent #141

Merged
merged 20 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 104 additions & 82 deletions tap_stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@
'transfers': {'type': 'transfer.*', 'object': ['transfer']},
'disputes': {'type': 'charge.dispute.*', 'object': ['dispute']},
'products': {'type': 'product.*', 'object': ['product']},
'invoice_line_items': {'type': 'invoice.*', 'object': ['line_item']},
'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription_item']},
'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']},
# Cannot find evidence of these streams having events associated:
# subscription_items - appears on subscriptions events
# balance_transactions - seems to be immutable
# payouts - these are called transfers with an event type of payout.*
}

# Some fields are not available by default with latest API version so
Expand Down Expand Up @@ -495,6 +496,7 @@ def evaluate_start_time_based_on_lookback(stream_name, replication_key, lookback
lookback_evaluated_time = bookmark - lookback_window
return lookback_evaluated_time
return start_date

def get_bookmark_for_sub_stream(stream_name):
"""
Get the bookmark for the child-stream based on the parent's replication key.
Expand Down Expand Up @@ -547,25 +549,52 @@ def convert_dict_to_stripe_object(record):

# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
def sync_stream(stream_name):
def sync_stream(stream_name, is_sub_stream=False):
karanpanchal-crest marked this conversation as resolved.
Show resolved Hide resolved
"""
Sync each stream, looking for newly created records. Updates are captured by events stream.

:param
stream_name - Name of the stream
is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected)
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
or when called through only child stream i.e. when parent is not selected.
"""
LOGGER.info("Started syncing stream %s", stream_name)

stream_metadata = metadata.to_map(Context.get_catalog_entry(stream_name)['metadata'])
stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name)
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved

extraction_time = singer.utils.now()
replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0]
if is_sub_stream:
# We need to change the parent, when only child is selected, hence need to change

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 can you please rewrite this comment as follows

Suggested change
# We need to change the parent, when only child is selected, hence need to change
# We need to get the parent data first for syncing the child streams. Hence, changing stream_name to parent stream when only child is selected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment

# stream_name to its parent.
stream_name = PARENT_STREAM_MAP.get(stream_name)
replication_key = STREAM_REPLICATION_KEY.get(stream_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 Optimization and Readability:

if is_sub_stream:
    sub_stream_name = stream_name
    stream_name = PARENT_STREAM_MAP.get(stream_name)

replication_key = STREAM_REPLICATION_KEY.get(stream_name)

You can remove the code at L578

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Updated the code
  • Also We cannot remove the L578 as if both parent and child would be selected, we'll need the sub_stream_name for fetching the child records.

else:
replication_key = STREAM_REPLICATION_KEY.get(stream_name)

# Invoice Items bookmarks on `date`, but queries on `created`
filter_key = 'created' if stream_name == 'invoice_items' else replication_key

sub_stream_name = SUB_STREAMS.get(stream_name)

# Get bookmark for the stream
stream_bookmark = get_bookmark_for_stream(stream_name, replication_key)

bookmark = stream_bookmark

# If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark)
should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name)

# Get the bookmark of the sub_stream if it is selected
if should_sync_sub_stream:
sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name)

# If both the parent and child streams are selected, get the minimum bookmark value
if not is_sub_stream:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
bookmark = min(stream_bookmark, sub_stream_bookmark)
# Set the substream bookmark to None (when only parent is selected)
else:
sub_stream_bookmark = None

with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer:
end_time = dt_to_epoch(utils.now())

Expand Down Expand Up @@ -620,8 +649,8 @@ def sync_stream(stream_name):
stream_obj_created = rec[replication_key]
rec['updated'] = stream_obj_created

# sync stream if object is greater than or equal to the bookmark
if stream_obj_created >= stream_bookmark:
# sync stream if object is greater than or equal to the bookmark and if parent is selected
if stream_obj_created >= stream_bookmark and not is_sub_stream:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
rec = transformer.transform(rec,
Context.get_catalog_entry(stream_name)['schema'],
stream_metadata)
Expand All @@ -638,12 +667,20 @@ def sync_stream(stream_name):

Context.new_counts[stream_name] += 1

# Update stream/sub-streams bookmarks as stop window
if stop_window > stream_bookmark:
# sync sub streams if it is selected and the parent object is greater than its bookmark
if should_sync_sub_stream and stream_obj_created > sub_stream_bookmark:
sync_sub_stream(sub_stream_name, stream_obj)

# Update stream bookmark as stop window when parent stream is selected
if not is_sub_stream and stop_window > stream_bookmark:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
stream_bookmark = stop_window
# Write bookmark for the stream
write_bookmark_for_stream(stream_name, replication_key, stream_bookmark)

# Update sub-stream bookmark as stop window when child stream is selected
if should_sync_sub_stream and stop_window > sub_stream_bookmark:
namrata270998 marked this conversation as resolved.
Show resolved Hide resolved
sub_stream_bookmark = stop_window
write_bookmark_for_stream(sub_stream_name, replication_key, sub_stream_bookmark)

singer.write_state(Context.state)

# update window for next iteration
Expand Down Expand Up @@ -677,7 +714,14 @@ def get_object_list_iterator(object_list):
# we are in a cycle.
INITIAL_SUB_STREAM_OBJECT_LIST_LENGTH = 10

def write_substream_records(sub_stream_name, parent_obj, updates=False):
def is_parent_selected(sub_stream_name):
"""
Given a child stream, check if the parent is selected.
"""
parent_stream = PARENT_STREAM_MAP.get(sub_stream_name)
return Context.is_selected(parent_stream)

def sync_sub_stream(sub_stream_name, parent_obj, updates=False):
"""
Given a parent object, retrieve its values for the specified substream.
"""
Expand Down Expand Up @@ -820,52 +864,6 @@ def write_substream_records(sub_stream_name, parent_obj, updates=False):
Context.new_counts[sub_stream_name] += 1


def sync_sub_stream(child_stream, bookmark_value):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous branch of fix_parent_child_relationship is already merged into crest-master, which had a different code. However, this function has a different code and is present on L724.

"""
Get the parent records based on the bookmark and corresponding child stream records
"""
# Get the parent stream of the stream
parent_stream = PARENT_STREAM_MAP[child_stream]
# Get the replication key for the stream
parent_replication_key = STREAM_REPLICATION_KEY[parent_stream]

end_time = dt_to_epoch(utils.now())

window_size = float(Context.config.get('date_window_size', DEFAULT_DATE_WINDOW_SIZE))

if DEFAULT_DATE_WINDOW_SIZE != window_size:
LOGGER.info('Using non-default date window size of %.2f',window_size)
start_window = bookmark_value

# NB: We observed records coming through newest->oldest and so
# date-windowing was added and the tap only bookmarks after it has
# gotten through a date window
while start_window < end_time:
stop_window = dt_to_epoch(epoch_to_dt(start_window) + timedelta(days=window_size))
# cut off the last window at the end time
if stop_window > end_time:
stop_window = end_time

# Get the parent records for the child-streams to loop on it and fetch the child records.
for parent_obj in paginate(
STREAM_SDK_OBJECTS[parent_stream]['sdk_object'],
parent_replication_key,
start_window,
stop_window,
parent_stream,
STREAM_SDK_OBJECTS[parent_stream].get('request_args')):
write_substream_records(child_stream, parent_obj)

# Update sub-streams bookmarks as stop window
if stop_window > bookmark_value:
bookmark_value = stop_window
# Write bookmark for the stream
write_bookmark_for_stream(child_stream, parent_replication_key, bookmark_value)
singer.write_state(Context.state)

# update window for next iteration
start_window = stop_window

def should_sync_event(events_obj, object_type, id_to_created_map):
"""Checks to ensure the event's underlying object has an id and that the id_to_created_map
contains an entry for that id. Returns true the first time an id should be added to the map
Expand Down Expand Up @@ -900,20 +898,51 @@ def recursive_to_dict(some_obj):
# Else just return
return some_obj

def sync_event_updates(stream_name, bookmark_value):
def sync_event_updates(stream_name, is_sub_stream):
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
'''
Get updates via events endpoint

look at 'events update' bookmark and pull events after that

:param
stream_name - Name of the stream
is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected)
or when called through only child stream i.e. when parent is not selected.
'''
LOGGER.info("Started syncing event based updates")

date_window_size = 60 * 60 * 24 # Seconds in a day

sub_stream_name = None
if stream_name in PARENT_STREAM_MAP.keys():
if is_sub_stream:
# Need to update the stream_name to its parent's stream name as for child we need to fetch the parents first
sub_stream_name = stream_name
stream_name = PARENT_STREAM_MAP[stream_name]
stream_name = PARENT_STREAM_MAP.get(sub_stream_name)

sub_stream_name = SUB_STREAMS.get(stream_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 This line overwrites L918. What is the need of this line explicitly?
If this function is called by sub-stream, we assign value to this variable at L918. Else, we keep it None as default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not keep it None as default, as if both parent and child would be selected, we'll need the sub_stream_name for fetching the child records.

Copy link

@karanpanchal-crest karanpanchal-crest Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. can you remove L919 as it would not have any use? It will always get overridden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the line as it will be overriden


parent_bookmark_value = singer.get_bookmark(Context.state,
stream_name + '_events',
'updates_created') or \
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

# Get the bookmark value of the sub_stream if its selected and present
if sub_stream_name:
sub_stream_bookmark_value = singer.get_bookmark(Context.state,
sub_stream_name + '_events',
'updates_created') or \
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

# If only child stream is selected, update bookmark to sub-stream bookmark value
if is_sub_stream:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
bookmark_value = sub_stream_bookmark_value

elif sub_stream_name and Context.is_selected(sub_stream_name):
# Get the minimum bookmark value from parent and child streams if both are selected.
bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value)

# Update the bookmark to parent bookmark value, if child is not selected
else:
bookmark_value = parent_bookmark_value

max_created = bookmark_value
date_window_start = max_created
Expand Down Expand Up @@ -980,7 +1009,8 @@ def sync_event_updates(stream_name, bookmark_value):

if events_obj.created >= bookmark_value:
if rec.get('id') is not None:
if sub_stream_name is None:
# Write parent records only when the parent is selected
if not is_sub_stream:
singer.write_record(stream_name,
rec,
time_extracted=extraction_time)
Expand All @@ -990,9 +1020,10 @@ def sync_event_updates(stream_name, bookmark_value):
if events_obj.get('type', '').endswith('.deleted'):
continue

if sub_stream_name:
# Write child stream records only when the child stream is selected
if sub_stream_name and Context.is_selected(sub_stream_name):
if event_resource_obj:
write_substream_records(sub_stream_name,
sync_sub_stream(sub_stream_name,
event_resource_obj,
updates=True)
if events_obj.created > max_created:
Expand All @@ -1002,13 +1033,15 @@ def sync_event_updates(stream_name, bookmark_value):
# cannot bookmark until the entire page is processed
date_window_start = date_window_end
date_window_end = date_window_end + date_window_size
if sub_stream_name is None:
# Write the parent bookmark value only when the parent is selected
karanpanchal-crest marked this conversation as resolved.
Show resolved Hide resolved
if not is_sub_stream:
singer.write_bookmark(Context.state,
stream_name + '_events',
'updates_created',
max_created)
singer.write_state(Context.state)
else:
# Write the child bookmark value only when the child is selected
karanpanchal-crest marked this conversation as resolved.
Show resolved Hide resolved
if sub_stream_name and Context.is_selected(sub_stream_name):
singer.write_bookmark(Context.state,
sub_stream_name + '_events',
'updates_created',
Expand All @@ -1034,24 +1067,13 @@ def sync():
# Loop over streams in catalog
for catalog_entry in Context.catalog['streams']:
stream_name = catalog_entry['tap_stream_id']
# Sync records for stream
if Context.is_selected(stream_name):
if not Context.is_sub_stream(stream_name): # Run the sync for parent-streams
sync_stream(stream_name)
# This prevents us from retrieving 'events.events'
# Run the sync for only parent streams/only child streams/both parent-child streams
karanpanchal-crest marked this conversation as resolved.
Show resolved Hide resolved
if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name):
sync_stream(stream_name, Context.is_sub_stream(stream_name))
# This prevents us from retrieving immutable stream events.
if STREAM_TO_TYPE_FILTER.get(stream_name):
bookmark_value = get_bookmark_for_stream(stream_name + '_events', 'updates_created')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previosuly in the older merged branch, we were fetching parent records twice when both parent and child were selected. Updated and optimized the code, hence removed this condition.

sync_event_updates(stream_name, bookmark_value)

else: # Run the sync for child-streams independently
bookmark_value = get_bookmark_for_sub_stream(stream_name)
sync_sub_stream(stream_name, bookmark_value)
# Get the child-stream's events bookmark if present
events_bookmark = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created')
# Use the child-stream's event bookmark if present, else use the original child-stream's bookmark
if events_bookmark:
bookmark_value = events_bookmark
sync_event_updates(stream_name, bookmark_value) # Run the sync mode for fetching events for the child-streams independently
sync_event_updates(stream_name, Context.is_sub_stream(stream_name))

@utils.handle_top_exception(LOGGER)
def main():
Expand Down
8 changes: 6 additions & 2 deletions tests/test_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
'invoice_line_items': set(),
'invoices': {
'test_clock',
'application'
'application',
'rendering_options'
},
'payment_intents': {
'amount_details'
Expand Down Expand Up @@ -179,7 +180,10 @@
'coupons': {
'percent_off', # BUG_9720 | Decimal('67') != Decimal('66.6') (value is changing in duplicate records)
},
'customers': set(),
'customers': {
# missing subfield 'rendering_options
'invoice_settings'
},
'subscriptions': {
# BUG_12478 | missing subfields in coupon where coupon is subfield within discount
# BUG_12478 | missing subfields on discount ['checkout_session', 'id', 'invoice', 'invoice_item', 'promotion_code']
Expand Down
Loading