From 233eaa8415164fc78e311279e5b5954688f276b3 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Thu, 9 Jun 2022 19:23:17 +0000 Subject: [PATCH 01/20] initial commit with optimized logic for parent child streams --- tap_stripe/__init__.py | 154 +++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 76 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index c604105d..12a49bb3 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -85,6 +85,9 @@ 'transfers': {'type': 'transfer.*', 'object': ['transfer']}, 'disputes': {'type': 'charge.dispute.*', 'object': ['dispute']}, 'products': {'type': 'product.*', 'object': ['product']}, + 'invoice_line_items': {'type': 'invoice.*', 'object': ['invoice']}, + 'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription']}, + 'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']}, # Cannot find evidence of these streams having events associated: # subscription_items - appears on subscriptions events # balance_transactions - seems to be immutable @@ -495,6 +498,7 @@ def evaluate_start_time_based_on_lookback(stream_name, replication_key, lookback lookback_evaluated_time = bookmark - lookback_window return lookback_evaluated_time return start_date + def get_bookmark_for_sub_stream(stream_name): """ Get the bookmark for the child-stream based on the parent's replication key. @@ -557,15 +561,38 @@ def sync_stream(stream_name): stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name) extraction_time = singer.utils.now() - replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] + should_sync_parent_stream = True + + if Context.is_sub_stream(stream_name): + replication_key = STREAM_REPLICATION_KEY.get(PARENT_STREAM_MAP.get(stream_name)) + sub_stream_name = stream_name + stream_name = PARENT_STREAM_MAP.get(stream_name) + should_sync_parent_stream = is_parent_selected(sub_stream_name) + else: + replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] + # Invoice Items bookmarks on `date`, but queries on `created` filter_key = 'created' if stream_name == 'invoice_items' else replication_key + sub_stream_name = SUB_STREAMS.get(stream_name) + # Get bookmark for the stream stream_bookmark = get_bookmark_for_stream(stream_name, replication_key) - bookmark = stream_bookmark + # If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark) + should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name) + + if should_sync_sub_stream and should_sync_parent_stream: + sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name) + + # if there is a sub stream, set bookmark to sub stream's bookmark + # since we know it must be earlier than the stream's bookmark + if sub_stream_bookmark != stream_bookmark: + bookmark = min(stream_bookmark, sub_stream_bookmark) + else: + bookmark = stream_bookmark + with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer: end_time = dt_to_epoch(utils.now()) @@ -621,7 +648,7 @@ def sync_stream(stream_name): rec['updated'] = stream_obj_created # sync stream if object is greater than or equal to the bookmark - if stream_obj_created >= stream_bookmark: + if stream_obj_created >= stream_bookmark and should_sync_parent_stream: rec = transformer.transform(rec, Context.get_catalog_entry(stream_name)['schema'], stream_metadata) @@ -638,11 +665,26 @@ def sync_stream(stream_name): Context.new_counts[stream_name] += 1 + # sync sub streams if its selected and the parent object + # is greater than its bookmark + if should_sync_sub_stream and stream_obj_created > sub_stream_bookmark: + sync_sub_stream(sub_stream_name, stream_obj) + # Update stream/sub-streams bookmarks as stop window if stop_window > stream_bookmark: stream_bookmark = stop_window - # Write bookmark for the stream - write_bookmark_for_stream(stream_name, replication_key, stream_bookmark) + singer.write_bookmark(Context.state, + stream_name, + replication_key, + stream_bookmark) + + # the sub stream bookmarks on its parent + if should_sync_sub_stream and stop_window > sub_stream_bookmark: + sub_stream_bookmark = stop_window + singer.write_bookmark(Context.state, + sub_stream_name, + replication_key, + sub_stream_bookmark) singer.write_state(Context.state) @@ -677,10 +719,18 @@ def get_object_list_iterator(object_list): # we are in a cycle. INITIAL_SUB_STREAM_OBJECT_LIST_LENGTH = 10 -def write_substream_records(sub_stream_name, parent_obj, updates=False): +def is_parent_selected(sub_stream_name): + """ + Given a child stream, check if the parent is selected. + """ + parent_stream = PARENT_STREAM_MAP.get(sub_stream_name) + return Context.is_selected(parent_stream) + +def sync_sub_stream(sub_stream_name, parent_obj, updates=False): """ Given a parent object, retrieve its values for the specified substream. """ + # LOGGER.info(f'>>>>> {sub_stream_name}') extraction_time = singer.utils.now() if sub_stream_name == "invoice_line_items": @@ -820,52 +870,6 @@ def write_substream_records(sub_stream_name, parent_obj, updates=False): Context.new_counts[sub_stream_name] += 1 -def sync_sub_stream(child_stream, bookmark_value): - """ - Get the parent records based on the bookmark and corresponding child stream records - """ - # Get the parent stream of the stream - parent_stream = PARENT_STREAM_MAP[child_stream] - # Get the replication key for the stream - parent_replication_key = STREAM_REPLICATION_KEY[parent_stream] - - end_time = dt_to_epoch(utils.now()) - - window_size = float(Context.config.get('date_window_size', DEFAULT_DATE_WINDOW_SIZE)) - - if DEFAULT_DATE_WINDOW_SIZE != window_size: - LOGGER.info('Using non-default date window size of %.2f',window_size) - start_window = bookmark_value - - # NB: We observed records coming through newest->oldest and so - # date-windowing was added and the tap only bookmarks after it has - # gotten through a date window - while start_window < end_time: - stop_window = dt_to_epoch(epoch_to_dt(start_window) + timedelta(days=window_size)) - # cut off the last window at the end time - if stop_window > end_time: - stop_window = end_time - - # Get the parent records for the child-streams to loop on it and fetch the child records. - for parent_obj in paginate( - STREAM_SDK_OBJECTS[parent_stream]['sdk_object'], - parent_replication_key, - start_window, - stop_window, - parent_stream, - STREAM_SDK_OBJECTS[parent_stream].get('request_args')): - write_substream_records(child_stream, parent_obj) - - # Update sub-streams bookmarks as stop window - if stop_window > bookmark_value: - bookmark_value = stop_window - # Write bookmark for the stream - write_bookmark_for_stream(child_stream, parent_replication_key, bookmark_value) - singer.write_state(Context.state) - - # update window for next iteration - start_window = stop_window - def should_sync_event(events_obj, object_type, id_to_created_map): """Checks to ensure the event's underlying object has an id and that the id_to_created_map contains an entry for that id. Returns true the first time an id should be added to the map @@ -910,11 +914,6 @@ def sync_event_updates(stream_name, bookmark_value): date_window_size = 60 * 60 * 24 # Seconds in a day - sub_stream_name = None - if stream_name in PARENT_STREAM_MAP.keys(): - sub_stream_name = stream_name - stream_name = PARENT_STREAM_MAP[stream_name] - max_created = bookmark_value date_window_start = max_created date_window_end = max_created + date_window_size @@ -923,6 +922,14 @@ def sync_event_updates(stream_name, bookmark_value): # Create a map to hold relate event object ids to timestamps updated_object_timestamps = {} + + should_sync_parent = True + sub_stream_name = SUB_STREAMS.get(stream_name) + + if Context.is_sub_stream(stream_name): + sub_stream_name = stream_name + should_sync_parent = is_parent_selected(stream_name) + stream_name = PARENT_STREAM_MAP.get(sub_stream_name) while not stop_paging: extraction_time = singer.utils.now() @@ -980,19 +987,19 @@ def sync_event_updates(stream_name, bookmark_value): if events_obj.created >= bookmark_value: if rec.get('id') is not None: - if sub_stream_name is None: + if should_sync_parent: singer.write_record(stream_name, rec, time_extracted=extraction_time) Context.updated_counts[stream_name] += 1 - # Delete events should be synced but not their subobjects - if events_obj.get('type', '').endswith('.deleted'): - continue + # Delete events should be synced but not their subobjects + if events_obj.get('type', '').endswith('.deleted'): + continue - if sub_stream_name: + if sub_stream_name and Context.is_selected(sub_stream_name): if event_resource_obj: - write_substream_records(sub_stream_name, + sync_sub_stream(sub_stream_name, event_resource_obj, updates=True) if events_obj.created > max_created: @@ -1002,13 +1009,13 @@ def sync_event_updates(stream_name, bookmark_value): # cannot bookmark until the entire page is processed date_window_start = date_window_end date_window_end = date_window_end + date_window_size - if sub_stream_name is None: + if sub_stream_name is None or should_sync_parent: singer.write_bookmark(Context.state, stream_name + '_events', 'updates_created', max_created) singer.write_state(Context.state) - else: + if sub_stream_name and Context.is_selected(sub_stream_name): singer.write_bookmark(Context.state, sub_stream_name + '_events', 'updates_created', @@ -1036,23 +1043,18 @@ def sync(): stream_name = catalog_entry['tap_stream_id'] # Sync records for stream if Context.is_selected(stream_name): - if not Context.is_sub_stream(stream_name): # Run the sync for parent-streams + if Context.is_selected(stream_name) and not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): # Run the sync for parent-streams sync_stream(stream_name) # This prevents us from retrieving 'events.events' if STREAM_TO_TYPE_FILTER.get(stream_name): bookmark_value = get_bookmark_for_stream(stream_name + '_events', 'updates_created') + if Context.is_sub_stream(stream_name) and is_parent_selected(stream_name): + parent_stream = PARENT_STREAM_MAP.get(stream_name) + parent_bookmark = get_bookmark_for_stream(parent_stream + '_events', 'updates_created') + bookmark_value = min(bookmark_value, parent_bookmark) + # substream_bookmark = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created') sync_event_updates(stream_name, bookmark_value) - else: # Run the sync for child-streams independently - bookmark_value = get_bookmark_for_sub_stream(stream_name) - sync_sub_stream(stream_name, bookmark_value) - # Get the child-stream's events bookmark if present - events_bookmark = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created') - # Use the child-stream's event bookmark if present, else use the original child-stream's bookmark - if events_bookmark: - bookmark_value = events_bookmark - sync_event_updates(stream_name, bookmark_value) # Run the sync mode for fetching events for the child-streams independently - @utils.handle_top_exception(LOGGER) def main(): # Parse command line arguments From 31f986d0c058cee53605bd35ece56696665dcd8e Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Fri, 10 Jun 2022 12:30:58 +0000 Subject: [PATCH 02/20] fixed issues in the invoices and invoice_line_items sync --- tap_stripe/__init__.py | 97 ++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 12a49bb3..343a4bdb 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -202,6 +202,12 @@ def is_sub_stream(cls, stream_name): if stream_name == sub_stream_id: return True return False + @classmethod + def is_parent_stream(cls, stream_name): + for stream_id in PARENT_STREAM_MAP.values(): + if stream_name == stream_id: + return True + return False @classmethod def print_counts(cls): @@ -551,7 +557,7 @@ def convert_dict_to_stripe_object(record): # pylint: disable=too-many-locals # pylint: disable=too-many-statements -def sync_stream(stream_name): +def sync_stream(stream_name, is_sub_stream=False): """ Sync each stream, looking for newly created records. Updates are captured by events stream. """ @@ -561,15 +567,14 @@ def sync_stream(stream_name): stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name) extraction_time = singer.utils.now() - should_sync_parent_stream = True - - if Context.is_sub_stream(stream_name): - replication_key = STREAM_REPLICATION_KEY.get(PARENT_STREAM_MAP.get(stream_name)) + if is_sub_stream: + # As this function expecting stream name as parent name hence changing values sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(stream_name) - should_sync_parent_stream = is_parent_selected(sub_stream_name) + replication_key = STREAM_REPLICATION_KEY.get(stream_name) else: - replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] + # replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] + replication_key = STREAM_REPLICATION_KEY.get(stream_name) # Invoice Items bookmarks on `date`, but queries on `created` filter_key = 'created' if stream_name == 'invoice_items' else replication_key @@ -583,15 +588,13 @@ def sync_stream(stream_name): # If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark) should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name) - if should_sync_sub_stream and should_sync_parent_stream: + if should_sync_sub_stream: sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name) - # if there is a sub stream, set bookmark to sub stream's bookmark - # since we know it must be earlier than the stream's bookmark - if sub_stream_bookmark != stream_bookmark: + if not is_sub_stream: bookmark = min(stream_bookmark, sub_stream_bookmark) else: - bookmark = stream_bookmark + sub_stream_bookmark = None with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer: end_time = dt_to_epoch(utils.now()) @@ -648,7 +651,7 @@ def sync_stream(stream_name): rec['updated'] = stream_obj_created # sync stream if object is greater than or equal to the bookmark - if stream_obj_created >= stream_bookmark and should_sync_parent_stream: + if stream_obj_created >= stream_bookmark and not is_sub_stream: rec = transformer.transform(rec, Context.get_catalog_entry(stream_name)['schema'], stream_metadata) @@ -671,20 +674,14 @@ def sync_stream(stream_name): sync_sub_stream(sub_stream_name, stream_obj) # Update stream/sub-streams bookmarks as stop window - if stop_window > stream_bookmark: + if not is_sub_stream and stop_window > stream_bookmark: stream_bookmark = stop_window - singer.write_bookmark(Context.state, - stream_name, - replication_key, - stream_bookmark) + write_bookmark_for_stream(stream_name, replication_key, stream_bookmark) # the sub stream bookmarks on its parent if should_sync_sub_stream and stop_window > sub_stream_bookmark: sub_stream_bookmark = stop_window - singer.write_bookmark(Context.state, - sub_stream_name, - replication_key, - sub_stream_bookmark) + write_bookmark_for_stream(sub_stream_name, replication_key, sub_stream_bookmark) singer.write_state(Context.state) @@ -730,7 +727,6 @@ def sync_sub_stream(sub_stream_name, parent_obj, updates=False): """ Given a parent object, retrieve its values for the specified substream. """ - # LOGGER.info(f'>>>>> {sub_stream_name}') extraction_time = singer.utils.now() if sub_stream_name == "invoice_line_items": @@ -904,7 +900,7 @@ def recursive_to_dict(some_obj): # Else just return return some_obj -def sync_event_updates(stream_name, bookmark_value): +def sync_event_updates(stream_name, is_sub_stream): ''' Get updates via events endpoint @@ -913,6 +909,29 @@ def sync_event_updates(stream_name, bookmark_value): LOGGER.info("Started syncing event based updates") date_window_size = 60 * 60 * 24 # Seconds in a day + + if is_sub_stream: + sub_stream_name = stream_name + stream_name = PARENT_STREAM_MAP.get(sub_stream_name) + + sub_stream_name = SUB_STREAMS.get(stream_name) + + parent_bookmark_value = singer.get_bookmark(Context.state, + stream_name + '_events', + 'updates_created') or \ + int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + + sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state, + sub_stream_name + '_events', + 'updates_created') or \ + int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + + if is_sub_stream: + bookmark_value = sub_stream_bookmark_value + elif sub_stream_name and Context.is_selected(sub_stream_name): + bookmark_value = min(parent_bookmark_value,sub_stream_bookmark_value) + else: + bookmark_value = parent_bookmark_value max_created = bookmark_value date_window_start = max_created @@ -922,14 +941,6 @@ def sync_event_updates(stream_name, bookmark_value): # Create a map to hold relate event object ids to timestamps updated_object_timestamps = {} - - should_sync_parent = True - sub_stream_name = SUB_STREAMS.get(stream_name) - - if Context.is_sub_stream(stream_name): - sub_stream_name = stream_name - should_sync_parent = is_parent_selected(stream_name) - stream_name = PARENT_STREAM_MAP.get(sub_stream_name) while not stop_paging: extraction_time = singer.utils.now() @@ -987,15 +998,15 @@ def sync_event_updates(stream_name, bookmark_value): if events_obj.created >= bookmark_value: if rec.get('id') is not None: - if should_sync_parent: + if not is_sub_stream: singer.write_record(stream_name, rec, time_extracted=extraction_time) Context.updated_counts[stream_name] += 1 - # Delete events should be synced but not their subobjects - if events_obj.get('type', '').endswith('.deleted'): - continue + # Delete events should be synced but not their subobjects + if events_obj.get('type', '').endswith('.deleted'): + continue if sub_stream_name and Context.is_selected(sub_stream_name): if event_resource_obj: @@ -1009,7 +1020,7 @@ def sync_event_updates(stream_name, bookmark_value): # cannot bookmark until the entire page is processed date_window_start = date_window_end date_window_end = date_window_end + date_window_size - if sub_stream_name is None or should_sync_parent: + if not is_sub_stream: singer.write_bookmark(Context.state, stream_name + '_events', 'updates_created', @@ -1043,17 +1054,11 @@ def sync(): stream_name = catalog_entry['tap_stream_id'] # Sync records for stream if Context.is_selected(stream_name): - if Context.is_selected(stream_name) and not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): # Run the sync for parent-streams - sync_stream(stream_name) + if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): # Run the sync for parent-streams + sync_stream(stream_name, Context.is_sub_stream(stream_name)) # This prevents us from retrieving 'events.events' if STREAM_TO_TYPE_FILTER.get(stream_name): - bookmark_value = get_bookmark_for_stream(stream_name + '_events', 'updates_created') - if Context.is_sub_stream(stream_name) and is_parent_selected(stream_name): - parent_stream = PARENT_STREAM_MAP.get(stream_name) - parent_bookmark = get_bookmark_for_stream(parent_stream + '_events', 'updates_created') - bookmark_value = min(bookmark_value, parent_bookmark) - # substream_bookmark = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created') - sync_event_updates(stream_name, bookmark_value) + sync_event_updates(stream_name, Context.is_sub_stream(stream_name)) @utils.handle_top_exception(LOGGER) def main(): From 8575c9dfafbc06738c3a2ef3c02074e29aba0b40 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Fri, 10 Jun 2022 13:01:52 +0000 Subject: [PATCH 03/20] fixed pylint --- tap_stripe/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 343a4bdb..0b956e0e 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -571,7 +571,7 @@ def sync_stream(stream_name, is_sub_stream=False): # As this function expecting stream name as parent name hence changing values sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(stream_name) - replication_key = STREAM_REPLICATION_KEY.get(stream_name) + replication_key = STREAM_REPLICATION_KEY.get(stream_name) else: # replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] replication_key = STREAM_REPLICATION_KEY.get(stream_name) @@ -909,23 +909,23 @@ def sync_event_updates(stream_name, is_sub_stream): LOGGER.info("Started syncing event based updates") date_window_size = 60 * 60 * 24 # Seconds in a day - + if is_sub_stream: sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(sub_stream_name) - + sub_stream_name = SUB_STREAMS.get(stream_name) parent_bookmark_value = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created') or \ int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) - + sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state, sub_stream_name + '_events', 'updates_created') or \ int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) - + if is_sub_stream: bookmark_value = sub_stream_bookmark_value elif sub_stream_name and Context.is_selected(sub_stream_name): From f29e24fa13d1bfc9f966f11148f8375c78d562fb Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Fri, 10 Jun 2022 14:54:38 +0000 Subject: [PATCH 04/20] updated unittests --- tap_stripe/__init__.py | 3 +- tests/unittests/test_child_bookmarks.py | 48 +++++++------------------ 2 files changed, 14 insertions(+), 37 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 0b956e0e..5aef4ca6 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -929,7 +929,7 @@ def sync_event_updates(stream_name, is_sub_stream): if is_sub_stream: bookmark_value = sub_stream_bookmark_value elif sub_stream_name and Context.is_selected(sub_stream_name): - bookmark_value = min(parent_bookmark_value,sub_stream_bookmark_value) + bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value) else: bookmark_value = parent_bookmark_value @@ -1038,6 +1038,7 @@ def sync(): """ The sync function called for the sync mode. """ + LOGGER.info("in sync") # Write all schemas and init count to 0 for catalog_entry in Context.catalog['streams']: stream_name = catalog_entry["tap_stream_id"] diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index 59d0b1e4..2035bd05 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -1,8 +1,9 @@ import unittest from unittest import mock -from tap_stripe import DEFAULT_DATE_WINDOW_SIZE, Context, sync, stripe, dt_to_epoch, utils +from tap_stripe import Context, sync, stripe, dt_to_epoch, utils class TestParentChildBookmarking(unittest.TestCase): + @mock.patch('tap_stripe.is_parent_selected', return_value=False) @mock.patch('tap_stripe.paginate') @mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True]) @mock.patch('tap_stripe.singer.write_schema') @@ -11,7 +12,7 @@ class TestParentChildBookmarking(unittest.TestCase): @mock.patch('tap_stripe.sync_event_updates') @mock.patch('tap_stripe.Context.get_catalog_entry') @mock.patch('tap_stripe.utils.now') - def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_updates, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate): + def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_updates, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected): ''' Verify that state is updated with parent's bookmark after syncing the child. ''' @@ -23,13 +24,14 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_up mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]} # metadata.to_map return value mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} - invoice_line_items_ts = 1641137533 - Context.state = {"bookmarks": {"invoices": {"date": 1645716195}, "invoice_line_items": {"date": invoice_line_items_ts}}} + invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z + Context.state = {"bookmarks": {"invoices": {"date": 1641137533}, "invoice_line_items": {"date": invoice_line_items_ts}}} sync() stop_window = dt_to_epoch(now_time) # Verify that the paginate function is called with the child stream bookmark mock_paginate.assert_called_with(stripe.Invoice, 'created', invoice_line_items_ts, stop_window, 'invoices', None) + @mock.patch('tap_stripe.is_parent_selected', return_value=False) @mock.patch('tap_stripe.paginate') @mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True]) @mock.patch('tap_stripe.singer.write_schema') @@ -38,10 +40,11 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_up @mock.patch('tap_stripe.Context.get_catalog_entry') @mock.patch('tap_stripe.utils.now') @mock.patch('tap_stripe.sync_sub_stream') + @mock.patch('tap_stripe.sync_stream') @mock.patch('tap_stripe.sync_event_updates') - def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_updates, sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate): + def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_updates, mock_sync_stream, mock_sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected): ''' - Verify that state is updated with parent's bookmark after syncing the child. + Verify that event_updates is called for child stream. ''' # mocked now time now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00') @@ -51,36 +54,9 @@ def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_u mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]} # metadata.to_map return value mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} - invoice_line_items_ts = 1641137533 + invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z events_ts = 1645716195 Context.state = {"bookmarks": {"invoice_line_items_events": {"updates_created": events_ts}, "invoice_line_items": {"date": invoice_line_items_ts}}} sync() - # Verify that the sync_event_updates function is called with the events bookmark - mock_sync_event_updates.assert_called_with('invoice_line_items', events_ts) - - @mock.patch('tap_stripe.paginate') - @mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True]) - @mock.patch('tap_stripe.singer.write_schema') - @mock.patch('tap_stripe.Context.is_selected', return_value=[True]) - @mock.patch('tap_stripe.metadata.to_map') - @mock.patch('tap_stripe.Context.get_catalog_entry') - @mock.patch('tap_stripe.utils.now') - @mock.patch('tap_stripe.sync_sub_stream') - @mock.patch('tap_stripe.sync_event_updates') - def test_sync_event_updates_when_events_bookmark_not_present(self, mock_sync_event_updates, sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate): - ''' - Verify that state is updated with parent's bookmark after syncing the child. - ''' - # mocked now time - now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00') - mock_now.return_value = now_time - # catalog passed in the context - Context.catalog = {'streams': [{'tap_stream_id': 'invoice_line_items', 'schema': {}, 'key_properties': [], 'metadata': []}]} - mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]} - # metadata.to_map return value - mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} - invoice_line_items_ts = 1641137533 - Context.state = {"bookmarks": {"invoice_line_items": {"date": invoice_line_items_ts}}} - sync() - # Verify that the sync_event_updates function is called with the child stream bookmark - mock_sync_event_updates.assert_called_with('invoice_line_items', invoice_line_items_ts) \ No newline at end of file + # Verify that the sync_event_updates function is called with the is_subStream parameter True + mock_sync_event_updates.assert_called_with('invoice_line_items', [True]) From ba84008dac6451af6b1f9306c26f2d4574de7544 Mon Sep 17 00:00:00 2001 From: dbshah1212 Date: Fri, 10 Jun 2022 22:30:51 +0530 Subject: [PATCH 05/20] TDL-19384: Added Sub Stream check --- tap_stripe/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 5aef4ca6..599d4219 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -569,7 +569,6 @@ def sync_stream(stream_name, is_sub_stream=False): extraction_time = singer.utils.now() if is_sub_stream: # As this function expecting stream name as parent name hence changing values - sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(stream_name) replication_key = STREAM_REPLICATION_KEY.get(stream_name) else: @@ -921,10 +920,11 @@ def sync_event_updates(stream_name, is_sub_stream): 'updates_created') or \ int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) - sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state, - sub_stream_name + '_events', - 'updates_created') or \ - int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + if sub_stream_name: + sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state, + sub_stream_name + '_events', + 'updates_created') or \ + int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) if is_sub_stream: bookmark_value = sub_stream_bookmark_value From 043b7cb9a9550413024d26e1bf774235a1aa2efa Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Fri, 10 Jun 2022 18:26:48 +0000 Subject: [PATCH 06/20] fixed unittests --- tap_stripe/__init__.py | 1 - tests/unittests/test_invoice_line_item_id.py | 14 +++++++------- tests/unittests/test_lookback_window.py | 6 +++--- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 599d4219..32228b5b 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -1038,7 +1038,6 @@ def sync(): """ The sync function called for the sync mode. """ - LOGGER.info("in sync") # Write all schemas and init count to 0 for catalog_entry in Context.catalog['streams']: stream_name = catalog_entry["tap_stream_id"] diff --git a/tests/unittests/test_invoice_line_item_id.py b/tests/unittests/test_invoice_line_item_id.py index ffcacf07..d0b5fb40 100644 --- a/tests/unittests/test_invoice_line_item_id.py +++ b/tests/unittests/test_invoice_line_item_id.py @@ -49,7 +49,7 @@ def test_no_events_updates(self, mocked_new_counts, mocked_updated_counts, mocke ] # function call when 'updates=False' - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), False) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), False) # expected data expected_record = { @@ -84,7 +84,7 @@ def test_no_unique_id(self, mocked_new_counts, mocked_updated_counts, mocked_get ] # function call - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), True) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), True) # expected data expected_record = { @@ -119,7 +119,7 @@ def test_no_updates_and_unique_id(self, mocked_new_counts, mocked_updated_counts ] # function call with 'updates=False' - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), False) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), False) # expected data expected_record = { @@ -154,7 +154,7 @@ def test_invoiceitem_with_invoice_item(self, mocked_new_counts, mocked_updated_c ] # function call with updates - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), True) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), True) # expected data expected_record = { @@ -191,7 +191,7 @@ def test_invoiceitem_without_invoice_item(self, mocked_new_counts, mocked_update ] # function call with updates - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), True) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), True) # expected data expected_record = { @@ -228,7 +228,7 @@ def test_subscription_without_subscription(self, mocked_new_counts, mocked_updat ] # function call with updates - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), True) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), True) # expected data expected_record = { @@ -265,7 +265,7 @@ def test_subscription_with_subscription(self, mocked_new_counts, mocked_updated_ ] # function call with updates - tap_stripe.write_substream_records("invoice_line_items", MockInvoice(lines), True) + tap_stripe.sync_sub_stream("invoice_line_items", MockInvoice(lines), True) # expected data expected_record = { diff --git a/tests/unittests/test_lookback_window.py b/tests/unittests/test_lookback_window.py index 4b9d48dc..031a43bf 100644 --- a/tests/unittests/test_lookback_window.py +++ b/tests/unittests/test_lookback_window.py @@ -15,12 +15,12 @@ def to_dict_recursive(cls): bookmark_time = 1645046000 # epoch bookmark time -@mock.patch("tap_stripe.reduce_foreign_keys", return_value = {"date": 16452804585}) -@mock.patch("tap_stripe.convert_dict_to_stripe_object", return_value = {"date": "2022-02-17T00:00:00"}) +@mock.patch("tap_stripe.reduce_foreign_keys", return_value = {"created": 16452804585}) +@mock.patch("tap_stripe.convert_dict_to_stripe_object", return_value = {"created": "2022-02-17T00:00:00"}) @mock.patch("tap_stripe.paginate", return_value = [MockClass()]) @mock.patch("tap_stripe.Context.get_catalog_entry") @mock.patch("tap_stripe.singer.metadata.to_map") -@mock.patch("tap_stripe.singer.metadata.get", return_value = ["date"]) +@mock.patch("tap_stripe.singer.metadata.get", return_value = ["created"]) @mock.patch("tap_stripe.epoch_to_dt") @mock.patch("tap_stripe.dt_to_epoch", side_effect = [1645056000, 1645056000, 1647647700]) # epoch timestamps @mock.patch("tap_stripe.sync_sub_stream") From 04bf4275f8945f99bb319ac7c156c04d0222d3cf Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 08:40:49 +0000 Subject: [PATCH 07/20] resolved PR comments and fix cci issues --- tap_stripe/__init__.py | 15 +++++++++------ tests/test_all_fields.py | 8 ++++++-- tests/unittests/test_child_bookmarks.py | 2 +- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 32228b5b..4403847e 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -85,13 +85,12 @@ 'transfers': {'type': 'transfer.*', 'object': ['transfer']}, 'disputes': {'type': 'charge.dispute.*', 'object': ['dispute']}, 'products': {'type': 'product.*', 'object': ['product']}, - 'invoice_line_items': {'type': 'invoice.*', 'object': ['invoice']}, - 'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription']}, + 'invoice_line_items': {'type': 'invoice.*', 'object': ['invoice_line_item']}, + 'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription_item']}, 'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']}, # Cannot find evidence of these streams having events associated: # subscription_items - appears on subscriptions events # balance_transactions - seems to be immutable - # payouts - these are called transfers with an event type of payout.* } # Some fields are not available by default with latest API version so @@ -568,11 +567,11 @@ def sync_stream(stream_name, is_sub_stream=False): extraction_time = singer.utils.now() if is_sub_stream: - # As this function expecting stream name as parent name hence changing values + # We need to change the parent, when only child is selected, hence need to change + # stream_name to its parent. stream_name = PARENT_STREAM_MAP.get(stream_name) replication_key = STREAM_REPLICATION_KEY.get(stream_name) else: - # replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] replication_key = STREAM_REPLICATION_KEY.get(stream_name) # Invoice Items bookmarks on `date`, but queries on `created` @@ -910,6 +909,8 @@ def sync_event_updates(stream_name, is_sub_stream): date_window_size = 60 * 60 * 24 # Seconds in a day if is_sub_stream: + # We need to change the parent, when only child is selected, hence need to change + # stream_name to its parent. sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(sub_stream_name) @@ -920,6 +921,7 @@ def sync_event_updates(stream_name, is_sub_stream): 'updates_created') or \ int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + # Get the bookmark value of the sub_stream if its selected and present if sub_stream_name: sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state, sub_stream_name + '_events', @@ -929,6 +931,7 @@ def sync_event_updates(stream_name, is_sub_stream): if is_sub_stream: bookmark_value = sub_stream_bookmark_value elif sub_stream_name and Context.is_selected(sub_stream_name): + # Get the minimum bookmark value from parent and child streams if both are selected. bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value) else: bookmark_value = parent_bookmark_value @@ -1056,7 +1059,7 @@ def sync(): if Context.is_selected(stream_name): if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): # Run the sync for parent-streams sync_stream(stream_name, Context.is_sub_stream(stream_name)) - # This prevents us from retrieving 'events.events' + # This prevents us from retrieving immutable stream events. if STREAM_TO_TYPE_FILTER.get(stream_name): sync_event_updates(stream_name, Context.is_sub_stream(stream_name)) diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py index 5390f780..b6a9694a 100644 --- a/tests/test_all_fields.py +++ b/tests/test_all_fields.py @@ -63,7 +63,8 @@ 'invoice_line_items': set(), 'invoices': { 'test_clock', - 'application' + 'application', + 'rendering_options' }, 'payment_intents': { 'amount_details' @@ -179,7 +180,10 @@ 'coupons': { 'percent_off', # BUG_9720 | Decimal('67') != Decimal('66.6') (value is changing in duplicate records) }, - 'customers': set(), + 'customers': { + # missing subfield 'rendering_options + 'invoice_settings' + }, 'subscriptions': { # BUG_12478 | missing subfields in coupon where coupon is subfield within discount # BUG_12478 | missing subfields on discount ['checkout_session', 'id', 'invoice', 'invoice_item', 'promotion_code'] diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index 2035bd05..730ed688 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -14,7 +14,7 @@ class TestParentChildBookmarking(unittest.TestCase): @mock.patch('tap_stripe.utils.now') def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_updates, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected): ''' - Verify that state is updated with parent's bookmark after syncing the child. + Verify that the paginate function is called with the child stream bookmark ''' # mocked now time now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00') From b96f328cebeafdcb4519f69b6b433e572ca9cbf2 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 08:56:08 +0000 Subject: [PATCH 08/20] updated invoice_line_item to line_item --- tap_stripe/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 4403847e..8f24d7c0 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -85,7 +85,7 @@ 'transfers': {'type': 'transfer.*', 'object': ['transfer']}, 'disputes': {'type': 'charge.dispute.*', 'object': ['dispute']}, 'products': {'type': 'product.*', 'object': ['product']}, - 'invoice_line_items': {'type': 'invoice.*', 'object': ['invoice_line_item']}, + 'invoice_line_items': {'type': 'invoice.*', 'object': ['line_item']}, 'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription_item']}, 'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']}, # Cannot find evidence of these streams having events associated: From 65337175e35e0899e27f8c7ca9e14618437b6a69 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 10:46:29 +0000 Subject: [PATCH 09/20] Updated the code comments --- tap_stripe/__init__.py | 43 ++++++++++++++++--------- tests/unittests/test_child_bookmarks.py | 2 +- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 8f24d7c0..7a64665d 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -559,6 +559,9 @@ def convert_dict_to_stripe_object(record): def sync_stream(stream_name, is_sub_stream=False): """ Sync each stream, looking for newly created records. Updates are captured by events stream. + + :param is_sub_stream - Check whether the funciton is called via the parent stream(only parent/both ar selected) + or when called through only child stream i.e. when parent is not selected. """ LOGGER.info("Started syncing stream %s", stream_name) @@ -586,9 +589,11 @@ def sync_stream(stream_name, is_sub_stream=False): # If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark) should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name) + # Get the bookmark of the sub_stream if it is selected if should_sync_sub_stream: sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name) + # If both the parent and child streams are selected, get the minimum bookmark value if not is_sub_stream: bookmark = min(stream_bookmark, sub_stream_bookmark) else: @@ -648,7 +653,7 @@ def sync_stream(stream_name, is_sub_stream=False): stream_obj_created = rec[replication_key] rec['updated'] = stream_obj_created - # sync stream if object is greater than or equal to the bookmark + # sync stream if object is greater than or equal to the bookmark and if only parent is selected if stream_obj_created >= stream_bookmark and not is_sub_stream: rec = transformer.transform(rec, Context.get_catalog_entry(stream_name)['schema'], @@ -666,17 +671,16 @@ def sync_stream(stream_name, is_sub_stream=False): Context.new_counts[stream_name] += 1 - # sync sub streams if its selected and the parent object - # is greater than its bookmark + # sync sub streams if it is selected and the parent object is greater than its bookmark if should_sync_sub_stream and stream_obj_created > sub_stream_bookmark: sync_sub_stream(sub_stream_name, stream_obj) - # Update stream/sub-streams bookmarks as stop window + # Update stream bookmark as stop window when parent stream is selected if not is_sub_stream and stop_window > stream_bookmark: stream_bookmark = stop_window write_bookmark_for_stream(stream_name, replication_key, stream_bookmark) - # the sub stream bookmarks on its parent + # Update sub-stream bookmark as stop window when child stream is selected if should_sync_sub_stream and stop_window > sub_stream_bookmark: sub_stream_bookmark = stop_window write_bookmark_for_stream(sub_stream_name, replication_key, sub_stream_bookmark) @@ -909,30 +913,33 @@ def sync_event_updates(stream_name, is_sub_stream): date_window_size = 60 * 60 * 24 # Seconds in a day if is_sub_stream: - # We need to change the parent, when only child is selected, hence need to change - # stream_name to its parent. + # Need to update the stream_name to its parent's stream name as for child we need to fetch the parents first sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(sub_stream_name) sub_stream_name = SUB_STREAMS.get(stream_name) parent_bookmark_value = singer.get_bookmark(Context.state, - stream_name + '_events', - 'updates_created') or \ - int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + stream_name + '_events', + 'updates_created') or \ + int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) # Get the bookmark value of the sub_stream if its selected and present if sub_stream_name: - sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state, - sub_stream_name + '_events', - 'updates_created') or \ - int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + sub_stream_bookmark_value = singer.get_bookmark(Context.state, + sub_stream_name + '_events', + 'updates_created') or \ + int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) + # If only child stream is selected, update bookmark to sub-stream bookmark value if is_sub_stream: bookmark_value = sub_stream_bookmark_value + elif sub_stream_name and Context.is_selected(sub_stream_name): # Get the minimum bookmark value from parent and child streams if both are selected. bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value) + + # Update the bookmark to parent bpokmark value, if child is not selected else: bookmark_value = parent_bookmark_value @@ -1001,6 +1008,7 @@ def sync_event_updates(stream_name, is_sub_stream): if events_obj.created >= bookmark_value: if rec.get('id') is not None: + # Write parent records only when the parent is selected if not is_sub_stream: singer.write_record(stream_name, rec, @@ -1011,6 +1019,7 @@ def sync_event_updates(stream_name, is_sub_stream): if events_obj.get('type', '').endswith('.deleted'): continue + # Write child stream records only when the child stream is selected if sub_stream_name and Context.is_selected(sub_stream_name): if event_resource_obj: sync_sub_stream(sub_stream_name, @@ -1023,12 +1032,14 @@ def sync_event_updates(stream_name, is_sub_stream): # cannot bookmark until the entire page is processed date_window_start = date_window_end date_window_end = date_window_end + date_window_size + # Write the parent bookmark value only when the parent is selected if not is_sub_stream: singer.write_bookmark(Context.state, stream_name + '_events', 'updates_created', max_created) singer.write_state(Context.state) + # Write the child bookmark value only when the child is selected if sub_stream_name and Context.is_selected(sub_stream_name): singer.write_bookmark(Context.state, sub_stream_name + '_events', @@ -1055,9 +1066,9 @@ def sync(): # Loop over streams in catalog for catalog_entry in Context.catalog['streams']: stream_name = catalog_entry['tap_stream_id'] - # Sync records for stream if Context.is_selected(stream_name): - if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): # Run the sync for parent-streams + # Run the sync for only parent streams/only child streams/both parent-child streams + if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): sync_stream(stream_name, Context.is_sub_stream(stream_name)) # This prevents us from retrieving immutable stream events. if STREAM_TO_TYPE_FILTER.get(stream_name): diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index 730ed688..eaf87920 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -58,5 +58,5 @@ def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_u events_ts = 1645716195 Context.state = {"bookmarks": {"invoice_line_items_events": {"updates_created": events_ts}, "invoice_line_items": {"date": invoice_line_items_ts}}} sync() - # Verify that the sync_event_updates function is called with the is_subStream parameter True + # Verify that the sync_event_updates function is called with the is_sub_stream parameter True mock_sync_event_updates.assert_called_with('invoice_line_items', [True]) From f394dc2965bf7f685c899f0238187371d4f84999 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 10:56:40 +0000 Subject: [PATCH 10/20] fixed typo --- tap_stripe/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 7a64665d..3ac15659 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -560,7 +560,7 @@ def sync_stream(stream_name, is_sub_stream=False): """ Sync each stream, looking for newly created records. Updates are captured by events stream. - :param is_sub_stream - Check whether the funciton is called via the parent stream(only parent/both ar selected) + :param is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) or when called through only child stream i.e. when parent is not selected. """ LOGGER.info("Started syncing stream %s", stream_name) @@ -907,6 +907,9 @@ def sync_event_updates(stream_name, is_sub_stream): Get updates via events endpoint look at 'events update' bookmark and pull events after that + + :param is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) + or when called through only child stream i.e. when parent is not selected. ''' LOGGER.info("Started syncing event based updates") From 7998129b40220c2dbdd6cf2b60ad6234bfc52596 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 12:46:54 +0000 Subject: [PATCH 11/20] updated code comments and added more unittests --- tap_stripe/__init__.py | 20 ++--- tests/unittests/test_child_bookmarks.py | 111 +++++++++++++++++++++++- 2 files changed, 117 insertions(+), 14 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 3ac15659..bf84db65 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -89,7 +89,6 @@ 'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription_item']}, 'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']}, # Cannot find evidence of these streams having events associated: - # subscription_items - appears on subscriptions events # balance_transactions - seems to be immutable } @@ -201,12 +200,6 @@ def is_sub_stream(cls, stream_name): if stream_name == sub_stream_id: return True return False - @classmethod - def is_parent_stream(cls, stream_name): - for stream_id in PARENT_STREAM_MAP.values(): - if stream_name == stream_id: - return True - return False @classmethod def print_counts(cls): @@ -560,7 +553,9 @@ def sync_stream(stream_name, is_sub_stream=False): """ Sync each stream, looking for newly created records. Updates are captured by events stream. - :param is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) + :param + stream_name - Name of the stream + is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) or when called through only child stream i.e. when parent is not selected. """ LOGGER.info("Started syncing stream %s", stream_name) @@ -596,6 +591,7 @@ def sync_stream(stream_name, is_sub_stream=False): # If both the parent and child streams are selected, get the minimum bookmark value if not is_sub_stream: bookmark = min(stream_bookmark, sub_stream_bookmark) + # Set the substream bookmark to None (when only parent is selected) else: sub_stream_bookmark = None @@ -653,7 +649,7 @@ def sync_stream(stream_name, is_sub_stream=False): stream_obj_created = rec[replication_key] rec['updated'] = stream_obj_created - # sync stream if object is greater than or equal to the bookmark and if only parent is selected + # sync stream if object is greater than or equal to the bookmark and if parent is selected if stream_obj_created >= stream_bookmark and not is_sub_stream: rec = transformer.transform(rec, Context.get_catalog_entry(stream_name)['schema'], @@ -908,7 +904,9 @@ def sync_event_updates(stream_name, is_sub_stream): look at 'events update' bookmark and pull events after that - :param is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) + :param + stream_name - Name of the stream + is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) or when called through only child stream i.e. when parent is not selected. ''' LOGGER.info("Started syncing event based updates") @@ -942,7 +940,7 @@ def sync_event_updates(stream_name, is_sub_stream): # Get the minimum bookmark value from parent and child streams if both are selected. bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value) - # Update the bookmark to parent bpokmark value, if child is not selected + # Update the bookmark to parent bookmark value, if child is not selected else: bookmark_value = parent_bookmark_value diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index eaf87920..f03f766d 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -1,6 +1,6 @@ import unittest from unittest import mock -from tap_stripe import Context, sync, stripe, dt_to_epoch, utils +from tap_stripe import Context, sync, stripe, dt_to_epoch, utils, is_parent_selected class TestParentChildBookmarking(unittest.TestCase): @mock.patch('tap_stripe.is_parent_selected', return_value=False) @@ -12,7 +12,10 @@ class TestParentChildBookmarking(unittest.TestCase): @mock.patch('tap_stripe.sync_event_updates') @mock.patch('tap_stripe.Context.get_catalog_entry') @mock.patch('tap_stripe.utils.now') - def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_updates, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected): + def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, + mock_event_updates, mock_to_map, mock_is_selected, + mock_write_schema, mock_is_sub_stream, + mock_paginate, mock_is_parent_selected): ''' Verify that the paginate function is called with the child stream bookmark ''' @@ -42,7 +45,10 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_up @mock.patch('tap_stripe.sync_sub_stream') @mock.patch('tap_stripe.sync_stream') @mock.patch('tap_stripe.sync_event_updates') - def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_updates, mock_sync_stream, mock_sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected): + def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_updates, mock_sync_stream, + mock_sync_sub_stream, mock_now, mock_get_catalog_entry, + mock_to_map, mock_is_selected, mock_write_schema, + mock_is_sub_stream, mock_paginate, mock_is_parent_selected): ''' Verify that event_updates is called for child stream. ''' @@ -60,3 +66,102 @@ def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_u sync() # Verify that the sync_event_updates function is called with the is_sub_stream parameter True mock_sync_event_updates.assert_called_with('invoice_line_items', [True]) + + @mock.patch('tap_stripe.is_parent_selected', return_value=False) + @mock.patch('tap_stripe.paginate') + @mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True]) + @mock.patch('tap_stripe.singer.write_schema') + @mock.patch('tap_stripe.Context.is_selected', return_value=[True]) + @mock.patch('tap_stripe.metadata.to_map') + @mock.patch('tap_stripe.Context.get_catalog_entry') + @mock.patch('tap_stripe.utils.now') + @mock.patch('tap_stripe.sync_sub_stream') + @mock.patch('tap_stripe.sync_event_updates') + @mock.patch('tap_stripe.get_bookmark_for_sub_stream') + def test_sync_event_updates_when_substream_bookmark_present(self, mock_get_bookmark_for_sub_stream, mock_sync_event_updates, + mock_sync_sub_stream, mock_now, mock_get_catalog_entry, + mock_to_map, mock_is_selected, mock_write_schema, + mock_is_sub_stream, mock_paginate, mock_is_parent_selected): + ''' + Verify that get_bookmark_for_sub_stream() is called only when the child stream is selected. + ''' + # mocked now time + now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00') + mock_get_bookmark_for_sub_stream.return_value = 1641137550 + mock_now.return_value = now_time + # catalog passed in the context + Context.catalog = {'streams': [{'tap_stream_id': 'invoice_line_items', 'schema': {}, 'key_properties': [], 'metadata': []}]} + Context.config = {'start_date': "2019-06-20 16:17:40.948019+00:00"} + mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]} + # metadata.to_map return value + mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} + invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z + events_ts = 1645716195 + Context.state = {"bookmarks": {"invoice_line_items_events": {"updates_created": events_ts}, "invoice_line_items": {"date": invoice_line_items_ts}}} + sync() + # Verify for substream get_bookmark_for_sub_stream is called with stream name + mock_get_bookmark_for_sub_stream.assert_called_with("invoice_line_items") + + @mock.patch('tap_stripe.reduce_foreign_keys', return_value = {"created": 1561047480}) + @mock.patch('tap_stripe.convert_dict_to_stripe_object') + @mock.patch('tap_stripe.is_parent_selected', return_value=False) + @mock.patch('tap_stripe.paginate', return_value = [mock.Mock()]) + @mock.patch('tap_stripe.Context.is_sub_stream', return_value=False) + @mock.patch('tap_stripe.singer.write_schema') + @mock.patch('tap_stripe.Context.is_selected', return_value=[True]) + @mock.patch('tap_stripe.metadata.to_map') + @mock.patch('tap_stripe.Context.get_catalog_entry') + @mock.patch('tap_stripe.utils.now') + @mock.patch('tap_stripe.sync_sub_stream') + @mock.patch('tap_stripe.sync_event_updates') + @mock.patch('tap_stripe.get_bookmark_for_sub_stream') + @mock.patch('tap_stripe.singer.write_record') + def test_sync_event_updates_for_parent_stream(self, mock_write_record, mock_get_bookmark, + mock_sync_event_updates, mock_sync_sub_stream, + mock_now, mock_get_catalog_entry, + mock_to_map, mock_is_selected, mock_write_schema, + mock_is_sub_stream, mock_paginate, mock_is_parent_selected, + mock_convert_dict, mock_reduce_foreign_keys): + ''' + Verify that when only the parent stream is selected, write_record() is called for the parent stream. + ''' + # mocked now time + now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00') + mock_get_bookmark.return_value = 1641137550 + mock_now.return_value = now_time + # catalog passed in the context + Context.catalog = {'streams': [{'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': []}]} + Context.config = {'start_date': "2019-06-20 16:17:40.948019+00:00"} + mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]} + # metadata.to_map return value + mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} + invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z + events_ts = 1645716195 + Context.state = {"bookmarks": {"invoice_line_items_events": {"updates_created": events_ts}, "invoice_line_items": {"date": invoice_line_items_ts}}} + sync() + + # Verify that one record is being written + self.assertEqual(1, mock_write_record.call_count) + + # @mock.patch('tap_stripe.Context.is_selected', return_value=True) + @mock.patch('tap_stripe.metadata.to_map') + @mock.patch('tap_stripe.Context.get_catalog_entry') + def test_is_parent_selected_when_parent_is_selected(self, mock_get_catalog_entry, mock_to_map): + """ + Verify that the is_parent_selected() returns True when the parent stream is also selected. + """ + mock_get_catalog_entry.return_value = {'tap_stream_id': 'subscriptions', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"], "selected": True}]} + mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} + parent_selected = is_parent_selected('subscription_items') + self.assertTrue(parent_selected) + + @mock.patch('tap_stripe.metadata.to_map') + @mock.patch('tap_stripe.Context.get_catalog_entry') + def test_is_parent_selected_when_parent_is_not_selected(self, mock_get_catalog_entry, mock_to_map): + """ + Verify that the is_parent_selected() returns False when the parent stream is not selected. + """ + mock_get_catalog_entry.return_value = {'tap_stream_id': 'subscriptions', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"], "selected": False}]} + mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': False, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} + parent_selected = is_parent_selected('subscription_items') + self.assertFalse(parent_selected) From 014d414be3869710deb0f652e7bb048f8c92e5b0 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 12:58:52 +0000 Subject: [PATCH 12/20] resolved pylint --- tap_stripe/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index bf84db65..e0594c1c 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -553,7 +553,7 @@ def sync_stream(stream_name, is_sub_stream=False): """ Sync each stream, looking for newly created records. Updates are captured by events stream. - :param + :param stream_name - Name of the stream is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) or when called through only child stream i.e. when parent is not selected. From e1851fc9986f6ae9ee2e10b9b95710a038c93700 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 13:19:58 +0000 Subject: [PATCH 13/20] removed uwanted comments --- tests/unittests/test_child_bookmarks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index f03f766d..4e56c720 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -143,7 +143,6 @@ def test_sync_event_updates_for_parent_stream(self, mock_write_record, mock_get_ # Verify that one record is being written self.assertEqual(1, mock_write_record.call_count) - # @mock.patch('tap_stripe.Context.is_selected', return_value=True) @mock.patch('tap_stripe.metadata.to_map') @mock.patch('tap_stripe.Context.get_catalog_entry') def test_is_parent_selected_when_parent_is_selected(self, mock_get_catalog_entry, mock_to_map): @@ -153,6 +152,7 @@ def test_is_parent_selected_when_parent_is_selected(self, mock_get_catalog_entry mock_get_catalog_entry.return_value = {'tap_stream_id': 'subscriptions', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"], "selected": True}]} mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} parent_selected = is_parent_selected('subscription_items') + # verify that the parent_selected returns True self.assertTrue(parent_selected) @mock.patch('tap_stripe.metadata.to_map') @@ -164,4 +164,5 @@ def test_is_parent_selected_when_parent_is_not_selected(self, mock_get_catalog_e mock_get_catalog_entry.return_value = {'tap_stream_id': 'subscriptions', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"], "selected": False}]} mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': False, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} parent_selected = is_parent_selected('subscription_items') + # verify that the parent_selected returns False self.assertFalse(parent_selected) From e6e3e976bcc7d86df4952e80c6e33a65576f27bc Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Mon, 13 Jun 2022 15:12:30 +0000 Subject: [PATCH 14/20] resolved PR comments --- tests/unittests/test_child_bookmarks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index 4e56c720..6a80e9c4 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -145,7 +145,7 @@ def test_sync_event_updates_for_parent_stream(self, mock_write_record, mock_get_ @mock.patch('tap_stripe.metadata.to_map') @mock.patch('tap_stripe.Context.get_catalog_entry') - def test_is_parent_selected_when_parent_is_selected(self, mock_get_catalog_entry, mock_to_map): + def test_is_parent_selected_when_child_is_selected(self, mock_get_catalog_entry, mock_to_map): """ Verify that the is_parent_selected() returns True when the parent stream is also selected. """ From 8966675cd520fa5af9cb693b33ff25dabd010f83 Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 14 Jun 2022 06:29:41 +0000 Subject: [PATCH 15/20] resolved PR comments --- tap_stripe/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index e0594c1c..5b3dd83f 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -555,8 +555,8 @@ def sync_stream(stream_name, is_sub_stream=False): :param stream_name - Name of the stream - is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) - or when called through only child stream i.e. when parent is not selected. + is_sub_stream - Check whether the function is called via the parent stream(only parent/both are selected) + or when called through only child stream i.e. when parent is not selected. """ LOGGER.info("Started syncing stream %s", stream_name) @@ -906,8 +906,8 @@ def sync_event_updates(stream_name, is_sub_stream): :param stream_name - Name of the stream - is_sub_stream - Check whether the function is called via the parent stream(only parent/both ar selected) - or when called through only child stream i.e. when parent is not selected. + is_sub_stream - Check whether the function is called via the parent stream(only parent/both are selected) + or when called through only child stream i.e. when parent is not selected. ''' LOGGER.info("Started syncing event based updates") From dc7f81383b3336128eb6d7b3bc94c5e5136f7acd Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 14 Jun 2022 12:31:12 +0000 Subject: [PATCH 16/20] resolved PR review comments --- tap_stripe/__init__.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 5b3dd83f..f38168d1 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -555,7 +555,7 @@ def sync_stream(stream_name, is_sub_stream=False): :param stream_name - Name of the stream - is_sub_stream - Check whether the function is called via the parent stream(only parent/both are selected) + is_sub_stream - Check whether the function is called via the parent stream(only parent/both parent-child are selected) or when called through only child stream i.e. when parent is not selected. """ LOGGER.info("Started syncing stream %s", stream_name) @@ -564,13 +564,14 @@ def sync_stream(stream_name, is_sub_stream=False): stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name) extraction_time = singer.utils.now() + if is_sub_stream: - # We need to change the parent, when only child is selected, hence need to change - # stream_name to its parent. + # We need to get the parent data first for syncing the child streams. Hence, + # changing stream_name to parent stream when only child is selected. + sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(stream_name) - replication_key = STREAM_REPLICATION_KEY.get(stream_name) - else: - replication_key = STREAM_REPLICATION_KEY.get(stream_name) + + replication_key = STREAM_REPLICATION_KEY.get(stream_name) # Invoice Items bookmarks on `date`, but queries on `created` filter_key = 'created' if stream_name == 'invoice_items' else replication_key @@ -1033,6 +1034,7 @@ def sync_event_updates(stream_name, is_sub_stream): # cannot bookmark until the entire page is processed date_window_start = date_window_end date_window_end = date_window_end + date_window_size + # Write the parent bookmark value only when the parent is selected if not is_sub_stream: singer.write_bookmark(Context.state, @@ -1040,6 +1042,7 @@ def sync_event_updates(stream_name, is_sub_stream): 'updates_created', max_created) singer.write_state(Context.state) + # Write the child bookmark value only when the child is selected if sub_stream_name and Context.is_selected(sub_stream_name): singer.write_bookmark(Context.state, From 8fa18e3c54a1e14164e456df1cc1ba938fa96b4e Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Tue, 14 Jun 2022 13:56:15 +0000 Subject: [PATCH 17/20] added integration test for only parent --- .circleci/config.yml | 8 ++++++ tap_stripe/__init__.py | 7 +++-- tests/test_parent_child_independent.py | 37 ++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 tests/test_parent_child_independent.py diff --git a/.circleci/config.yml b/.circleci/config.yml index b2706d76..2cdd4174 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -222,6 +222,14 @@ workflows: file: configurable_lookback_window requires: - 'Testing all_fields' + - run_integration_test: + name: 'Testing parent_child_independent' + context: + - circleci-user + - tier-1-tap-user + file: parent_child_independent + requires: + - 'Testing all_fields' - run_integration_test: name: 'Testing automatic_payout_transactions' context: diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index f38168d1..820c89fe 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -568,7 +568,6 @@ def sync_stream(stream_name, is_sub_stream=False): if is_sub_stream: # We need to get the parent data first for syncing the child streams. Hence, # changing stream_name to parent stream when only child is selected. - sub_stream_name = stream_name stream_name = PARENT_STREAM_MAP.get(stream_name) replication_key = STREAM_REPLICATION_KEY.get(stream_name) @@ -915,9 +914,9 @@ def sync_event_updates(stream_name, is_sub_stream): date_window_size = 60 * 60 * 24 # Seconds in a day if is_sub_stream: - # Need to update the stream_name to its parent's stream name as for child we need to fetch the parents first - sub_stream_name = stream_name - stream_name = PARENT_STREAM_MAP.get(sub_stream_name) + # We need to get the parent data first for syncing the child streams. Hence, + # changing stream_name to parent stream when only child is selected. + stream_name = PARENT_STREAM_MAP.get(stream_name) sub_stream_name = SUB_STREAMS.get(stream_name) diff --git a/tests/test_parent_child_independent.py b/tests/test_parent_child_independent.py new file mode 100644 index 00000000..f03730c6 --- /dev/null +++ b/tests/test_parent_child_independent.py @@ -0,0 +1,37 @@ +from tap_tester import runner, connections +from base import BaseTapTest + +class ParentChildIndependentTest(BaseTapTest): + + def name(self): + return "tt_stripe_parent_child_test" + + def test_child_streams(self): + """ + Test case to verify that tap is working fine if only first level child streams are selected + """ + # select child streams only and run the test + child_streams = {"invoice_line_items", "subscription_items", "payout_transactions"} + self.run_test(child_streams) + + def run_test(self, streams): + """ + Testing that tap is working fine if only child streams are selected + - Verify that if only child streams are selected then only child stream are replicated. + """ + # instantiate connection + conn_id = connections.ensure_connection(self) + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + self.perform_and_verify_table_and_field_selection(conn_id, found_catalogs, streams_to_select=streams) + + # run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(streams, synced_stream_names) \ No newline at end of file From 8794589834fec19828a0c533dfc2cf5684f8d8bf Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 15 Jun 2022 12:29:17 +0000 Subject: [PATCH 18/20] resolved PR comments --- tap_stripe/__init__.py | 1 + tests/unittests/test_child_bookmarks.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 820c89fe..571c97d4 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -587,6 +587,7 @@ def sync_stream(stream_name, is_sub_stream=False): # Get the bookmark of the sub_stream if it is selected if should_sync_sub_stream: sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name) + bookmark = sub_stream_bookmark # If both the parent and child streams are selected, get the minimum bookmark value if not is_sub_stream: diff --git a/tests/unittests/test_child_bookmarks.py b/tests/unittests/test_child_bookmarks.py index 6a80e9c4..dc92be5c 100644 --- a/tests/unittests/test_child_bookmarks.py +++ b/tests/unittests/test_child_bookmarks.py @@ -7,7 +7,7 @@ class TestParentChildBookmarking(unittest.TestCase): @mock.patch('tap_stripe.paginate') @mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True]) @mock.patch('tap_stripe.singer.write_schema') - @mock.patch('tap_stripe.Context.is_selected', return_value=[True]) + @mock.patch('tap_stripe.Context.is_selected', return_value=True) @mock.patch('tap_stripe.metadata.to_map') @mock.patch('tap_stripe.sync_event_updates') @mock.patch('tap_stripe.Context.get_catalog_entry') @@ -20,7 +20,7 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, Verify that the paginate function is called with the child stream bookmark ''' # mocked now time - now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00') + now_time = utils.strptime_with_tz('2022-02-01 15:32:13.000000+00:00') mock_now.return_value = now_time # catalog passed in the context Context.catalog = {'streams': [{'tap_stream_id': 'invoice_line_items', 'schema': {}, 'key_properties': [], 'metadata': []}]} @@ -28,9 +28,9 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, # metadata.to_map return value mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}} invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z - Context.state = {"bookmarks": {"invoices": {"date": 1641137533}, "invoice_line_items": {"date": invoice_line_items_ts}}} + Context.state = {"bookmarks": {"invoices": {"date": 1641137539}, "invoice_line_items": {"date": invoice_line_items_ts}}} sync() - stop_window = dt_to_epoch(now_time) + stop_window = invoice_line_items_ts + (30 * 24 * 60 * 60) # Verify that the paginate function is called with the child stream bookmark mock_paginate.assert_called_with(stripe.Invoice, 'created', invoice_line_items_ts, stop_window, 'invoices', None) From c6b5c6690a0bf602bfa6458d008fabf2b5bc315d Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 15 Jun 2022 12:52:45 +0000 Subject: [PATCH 19/20] skipped some newly generated fields from all_fields --- tests/test_all_fields.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py index b6a9694a..093f6956 100644 --- a/tests/test_all_fields.py +++ b/tests/test_all_fields.py @@ -60,11 +60,16 @@ }, 'subscription_items': set(), 'plans': set(), - 'invoice_line_items': set(), + 'invoice_line_items': { + 'amount_excluding_tax', + 'unit_amount_excluding_tax' + }, 'invoices': { 'test_clock', 'application', - 'rendering_options' + 'rendering_options', + 'total_excluding_tax', + 'subtotal_excluding_tax' }, 'payment_intents': { 'amount_details' From 5a37a461e0e8c0b1b52f8150991a18d047151b6b Mon Sep 17 00:00:00 2001 From: namrata270998 Date: Wed, 15 Jun 2022 13:27:03 +0000 Subject: [PATCH 20/20] added failing field as missing nested field --- tests/test_all_fields.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py index 093f6956..57fb4105 100644 --- a/tests/test_all_fields.py +++ b/tests/test_all_fields.py @@ -223,7 +223,10 @@ # missing subfield ['payment_method'] 'last_payment_error' }, - 'invoice_line_items': set() + 'invoice_line_items': { + # missing subfield ['custom_unit_amount] + 'price' + } # 'invoice_line_items': { # TODO This is a test issue that prevents us from consistently passing # 'unique_line_item_id', # 'invoice_item',