Skip to content

Commit

Permalink
TDL-19384 optimize logic for parent child relationship to be independ…
Browse files Browse the repository at this point in the history
…ent (#141)

* initial commit with optimized logic for parent child streams

* fixed issues in the invoices and invoice_line_items sync

* fixed pylint

* updated unittests

* TDL-19384: Added Sub Stream check

* fixed unittests

* resolved PR comments and fix cci issues

* updated invoice_line_item to line_item

* Updated the code comments

* fixed typo

* updated code comments and added more unittests

* resolved pylint

* removed uwanted comments

* resolved PR comments

* resolved PR comments

* resolved PR review comments

* added integration test for only parent

* resolved PR comments

* skipped some newly generated fields from all_fields

* added failing field as missing nested field

Co-authored-by: dbshah1212 <[email protected]>
  • Loading branch information
namrata270998 and dbshah1212 authored Jun 22, 2022
1 parent 84208e0 commit 93e447b
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 116 deletions.
8 changes: 8 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ workflows:
file: configurable_lookback_window
requires:
- 'Testing all_fields'
- run_integration_test:
name: 'Testing parent_child_independent'
context:
- circleci-user
- tier-1-tap-user
file: parent_child_independent
requires:
- 'Testing all_fields'
- run_integration_test:
name: 'Testing automatic_payout_transactions'
context:
Expand Down
191 changes: 108 additions & 83 deletions tap_stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@
'transfers': {'type': 'transfer.*', 'object': ['transfer']},
'disputes': {'type': 'charge.dispute.*', 'object': ['dispute']},
'products': {'type': 'product.*', 'object': ['product']},
'invoice_line_items': {'type': 'invoice.*', 'object': ['line_item']},
'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription_item']},
'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']},
# Cannot find evidence of these streams having events associated:
# subscription_items - appears on subscriptions events
# balance_transactions - seems to be immutable
# payouts - these are called transfers with an event type of payout.*
}

# Some fields are not available by default with latest API version so
Expand Down Expand Up @@ -495,6 +496,7 @@ def evaluate_start_time_based_on_lookback(stream_name, replication_key, lookback
lookback_evaluated_time = bookmark - lookback_window
return lookback_evaluated_time
return start_date

def get_bookmark_for_sub_stream(stream_name):
"""
Get the bookmark for the child-stream based on the parent's replication key.
Expand Down Expand Up @@ -547,25 +549,53 @@ def convert_dict_to_stripe_object(record):

# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
def sync_stream(stream_name):
def sync_stream(stream_name, is_sub_stream=False):
"""
Sync each stream, looking for newly created records. Updates are captured by events stream.
:param
stream_name - Name of the stream
is_sub_stream - Check whether the function is called via the parent stream(only parent/both parent-child are selected)
or when called through only child stream i.e. when parent is not selected.
"""
LOGGER.info("Started syncing stream %s", stream_name)

stream_metadata = metadata.to_map(Context.get_catalog_entry(stream_name)['metadata'])
stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name)

extraction_time = singer.utils.now()
replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0]

if is_sub_stream:
# We need to get the parent data first for syncing the child streams. Hence,
# changing stream_name to parent stream when only child is selected.
stream_name = PARENT_STREAM_MAP.get(stream_name)

replication_key = STREAM_REPLICATION_KEY.get(stream_name)

# Invoice Items bookmarks on `date`, but queries on `created`
filter_key = 'created' if stream_name == 'invoice_items' else replication_key

sub_stream_name = SUB_STREAMS.get(stream_name)

# Get bookmark for the stream
stream_bookmark = get_bookmark_for_stream(stream_name, replication_key)

bookmark = stream_bookmark

# If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark)
should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name)

# Get the bookmark of the sub_stream if it is selected
if should_sync_sub_stream:
sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name)
bookmark = sub_stream_bookmark

# If both the parent and child streams are selected, get the minimum bookmark value
if not is_sub_stream:
bookmark = min(stream_bookmark, sub_stream_bookmark)
# Set the substream bookmark to None (when only parent is selected)
else:
sub_stream_bookmark = None

with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer:
end_time = dt_to_epoch(utils.now())

Expand Down Expand Up @@ -620,8 +650,8 @@ def sync_stream(stream_name):
stream_obj_created = rec[replication_key]
rec['updated'] = stream_obj_created

# sync stream if object is greater than or equal to the bookmark
if stream_obj_created >= stream_bookmark:
# sync stream if object is greater than or equal to the bookmark and if parent is selected
if stream_obj_created >= stream_bookmark and not is_sub_stream:
rec = transformer.transform(rec,
Context.get_catalog_entry(stream_name)['schema'],
stream_metadata)
Expand All @@ -638,12 +668,20 @@ def sync_stream(stream_name):

Context.new_counts[stream_name] += 1

# Update stream/sub-streams bookmarks as stop window
if stop_window > stream_bookmark:
# sync sub streams if it is selected and the parent object is greater than its bookmark
if should_sync_sub_stream and stream_obj_created > sub_stream_bookmark:
sync_sub_stream(sub_stream_name, stream_obj)

# Update stream bookmark as stop window when parent stream is selected
if not is_sub_stream and stop_window > stream_bookmark:
stream_bookmark = stop_window
# Write bookmark for the stream
write_bookmark_for_stream(stream_name, replication_key, stream_bookmark)

# Update sub-stream bookmark as stop window when child stream is selected
if should_sync_sub_stream and stop_window > sub_stream_bookmark:
sub_stream_bookmark = stop_window
write_bookmark_for_stream(sub_stream_name, replication_key, sub_stream_bookmark)

singer.write_state(Context.state)

# update window for next iteration
Expand Down Expand Up @@ -677,7 +715,14 @@ def get_object_list_iterator(object_list):
# we are in a cycle.
INITIAL_SUB_STREAM_OBJECT_LIST_LENGTH = 10

def write_substream_records(sub_stream_name, parent_obj, updates=False):
def is_parent_selected(sub_stream_name):
"""
Given a child stream, check if the parent is selected.
"""
parent_stream = PARENT_STREAM_MAP.get(sub_stream_name)
return Context.is_selected(parent_stream)

def sync_sub_stream(sub_stream_name, parent_obj, updates=False):
"""
Given a parent object, retrieve its values for the specified substream.
"""
Expand Down Expand Up @@ -820,52 +865,6 @@ def write_substream_records(sub_stream_name, parent_obj, updates=False):
Context.new_counts[sub_stream_name] += 1


def sync_sub_stream(child_stream, bookmark_value):
"""
Get the parent records based on the bookmark and corresponding child stream records
"""
# Get the parent stream of the stream
parent_stream = PARENT_STREAM_MAP[child_stream]
# Get the replication key for the stream
parent_replication_key = STREAM_REPLICATION_KEY[parent_stream]

end_time = dt_to_epoch(utils.now())

window_size = float(Context.config.get('date_window_size', DEFAULT_DATE_WINDOW_SIZE))

if DEFAULT_DATE_WINDOW_SIZE != window_size:
LOGGER.info('Using non-default date window size of %.2f',window_size)
start_window = bookmark_value

# NB: We observed records coming through newest->oldest and so
# date-windowing was added and the tap only bookmarks after it has
# gotten through a date window
while start_window < end_time:
stop_window = dt_to_epoch(epoch_to_dt(start_window) + timedelta(days=window_size))
# cut off the last window at the end time
if stop_window > end_time:
stop_window = end_time

# Get the parent records for the child-streams to loop on it and fetch the child records.
for parent_obj in paginate(
STREAM_SDK_OBJECTS[parent_stream]['sdk_object'],
parent_replication_key,
start_window,
stop_window,
parent_stream,
STREAM_SDK_OBJECTS[parent_stream].get('request_args')):
write_substream_records(child_stream, parent_obj)

# Update sub-streams bookmarks as stop window
if stop_window > bookmark_value:
bookmark_value = stop_window
# Write bookmark for the stream
write_bookmark_for_stream(child_stream, parent_replication_key, bookmark_value)
singer.write_state(Context.state)

# update window for next iteration
start_window = stop_window

def should_sync_event(events_obj, object_type, id_to_created_map):
"""Checks to ensure the event's underlying object has an id and that the id_to_created_map
contains an entry for that id. Returns true the first time an id should be added to the map
Expand Down Expand Up @@ -900,20 +899,51 @@ def recursive_to_dict(some_obj):
# Else just return
return some_obj

def sync_event_updates(stream_name, bookmark_value):
def sync_event_updates(stream_name, is_sub_stream):
'''
Get updates via events endpoint
look at 'events update' bookmark and pull events after that
:param
stream_name - Name of the stream
is_sub_stream - Check whether the function is called via the parent stream(only parent/both are selected)
or when called through only child stream i.e. when parent is not selected.
'''
LOGGER.info("Started syncing event based updates")

date_window_size = 60 * 60 * 24 # Seconds in a day

sub_stream_name = None
if stream_name in PARENT_STREAM_MAP.keys():
sub_stream_name = stream_name
stream_name = PARENT_STREAM_MAP[stream_name]
if is_sub_stream:
# We need to get the parent data first for syncing the child streams. Hence,
# changing stream_name to parent stream when only child is selected.
stream_name = PARENT_STREAM_MAP.get(stream_name)

sub_stream_name = SUB_STREAMS.get(stream_name)

parent_bookmark_value = singer.get_bookmark(Context.state,
stream_name + '_events',
'updates_created') or \
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

# Get the bookmark value of the sub_stream if its selected and present
if sub_stream_name:
sub_stream_bookmark_value = singer.get_bookmark(Context.state,
sub_stream_name + '_events',
'updates_created') or \
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

# If only child stream is selected, update bookmark to sub-stream bookmark value
if is_sub_stream:
bookmark_value = sub_stream_bookmark_value

elif sub_stream_name and Context.is_selected(sub_stream_name):
# Get the minimum bookmark value from parent and child streams if both are selected.
bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value)

# Update the bookmark to parent bookmark value, if child is not selected
else:
bookmark_value = parent_bookmark_value

max_created = bookmark_value
date_window_start = max_created
Expand Down Expand Up @@ -980,7 +1010,8 @@ def sync_event_updates(stream_name, bookmark_value):

if events_obj.created >= bookmark_value:
if rec.get('id') is not None:
if sub_stream_name is None:
# Write parent records only when the parent is selected
if not is_sub_stream:
singer.write_record(stream_name,
rec,
time_extracted=extraction_time)
Expand All @@ -990,9 +1021,10 @@ def sync_event_updates(stream_name, bookmark_value):
if events_obj.get('type', '').endswith('.deleted'):
continue

if sub_stream_name:
# Write child stream records only when the child stream is selected
if sub_stream_name and Context.is_selected(sub_stream_name):
if event_resource_obj:
write_substream_records(sub_stream_name,
sync_sub_stream(sub_stream_name,
event_resource_obj,
updates=True)
if events_obj.created > max_created:
Expand All @@ -1002,13 +1034,17 @@ def sync_event_updates(stream_name, bookmark_value):
# cannot bookmark until the entire page is processed
date_window_start = date_window_end
date_window_end = date_window_end + date_window_size
if sub_stream_name is None:

# Write the parent bookmark value only when the parent is selected
if not is_sub_stream:
singer.write_bookmark(Context.state,
stream_name + '_events',
'updates_created',
max_created)
singer.write_state(Context.state)
else:

# Write the child bookmark value only when the child is selected
if sub_stream_name and Context.is_selected(sub_stream_name):
singer.write_bookmark(Context.state,
sub_stream_name + '_events',
'updates_created',
Expand All @@ -1034,24 +1070,13 @@ def sync():
# Loop over streams in catalog
for catalog_entry in Context.catalog['streams']:
stream_name = catalog_entry['tap_stream_id']
# Sync records for stream
if Context.is_selected(stream_name):
if not Context.is_sub_stream(stream_name): # Run the sync for parent-streams
sync_stream(stream_name)
# This prevents us from retrieving 'events.events'
# Run the sync for only parent streams/only child streams/both parent-child streams
if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name):
sync_stream(stream_name, Context.is_sub_stream(stream_name))
# This prevents us from retrieving immutable stream events.
if STREAM_TO_TYPE_FILTER.get(stream_name):
bookmark_value = get_bookmark_for_stream(stream_name + '_events', 'updates_created')
sync_event_updates(stream_name, bookmark_value)

else: # Run the sync for child-streams independently
bookmark_value = get_bookmark_for_sub_stream(stream_name)
sync_sub_stream(stream_name, bookmark_value)
# Get the child-stream's events bookmark if present
events_bookmark = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created')
# Use the child-stream's event bookmark if present, else use the original child-stream's bookmark
if events_bookmark:
bookmark_value = events_bookmark
sync_event_updates(stream_name, bookmark_value) # Run the sync mode for fetching events for the child-streams independently
sync_event_updates(stream_name, Context.is_sub_stream(stream_name))

@utils.handle_top_exception(LOGGER)
def main():
Expand Down
20 changes: 16 additions & 4 deletions tests/test_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@
},
'subscription_items': set(),
'plans': set(),
'invoice_line_items': set(),
'invoice_line_items': {
'amount_excluding_tax',
'unit_amount_excluding_tax'
},
'invoices': {
'test_clock',
'application'
'application',
'rendering_options',
'total_excluding_tax',
'subtotal_excluding_tax'
},
'payment_intents': {
'amount_details'
Expand Down Expand Up @@ -179,7 +185,10 @@
'coupons': {
'percent_off', # BUG_9720 | Decimal('67') != Decimal('66.6') (value is changing in duplicate records)
},
'customers': set(),
'customers': {
# missing subfield 'rendering_options
'invoice_settings'
},
'subscriptions': {
# BUG_12478 | missing subfields in coupon where coupon is subfield within discount
# BUG_12478 | missing subfields on discount ['checkout_session', 'id', 'invoice', 'invoice_item', 'promotion_code']
Expand Down Expand Up @@ -214,7 +223,10 @@
# missing subfield ['payment_method']
'last_payment_error'
},
'invoice_line_items': set()
'invoice_line_items': {
# missing subfield ['custom_unit_amount]
'price'
}
# 'invoice_line_items': { # TODO This is a test issue that prevents us from consistently passing
# 'unique_line_item_id',
# 'invoice_item',
Expand Down
Loading

0 comments on commit 93e447b

Please sign in to comment.