Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19384 optimize logic for parent child relationship to be independent #141

Merged
merged 20 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 83 additions & 76 deletions tap_stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
'transfers': {'type': 'transfer.*', 'object': ['transfer']},
'disputes': {'type': 'charge.dispute.*', 'object': ['dispute']},
'products': {'type': 'product.*', 'object': ['product']},
'invoice_line_items': {'type': 'invoice.*', 'object': ['invoice']},
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
'subscription_items': {'type': 'customer.subscription.*', 'object': ['subscription']},
'payout_transactions': {'type': 'payout.*', 'object': ['transfer', 'payout']},
# Cannot find evidence of these streams having events associated:
# subscription_items - appears on subscriptions events
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
# balance_transactions - seems to be immutable
Expand Down Expand Up @@ -199,6 +202,12 @@ def is_sub_stream(cls, stream_name):
if stream_name == sub_stream_id:
return True
return False
@classmethod
def is_parent_stream(cls, stream_name):
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
for stream_id in PARENT_STREAM_MAP.values():
if stream_name == stream_id:
return True
return False

@classmethod
def print_counts(cls):
Expand Down Expand Up @@ -495,6 +504,7 @@ def evaluate_start_time_based_on_lookback(stream_name, replication_key, lookback
lookback_evaluated_time = bookmark - lookback_window
return lookback_evaluated_time
return start_date

def get_bookmark_for_sub_stream(stream_name):
"""
Get the bookmark for the child-stream based on the parent's replication key.
Expand Down Expand Up @@ -547,7 +557,7 @@ def convert_dict_to_stripe_object(record):

# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
def sync_stream(stream_name):
def sync_stream(stream_name, is_sub_stream=False):
karanpanchal-crest marked this conversation as resolved.
Show resolved Hide resolved
"""
Sync each stream, looking for newly created records. Updates are captured by events stream.
"""
Expand All @@ -557,15 +567,34 @@ def sync_stream(stream_name):
stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name)
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved

extraction_time = singer.utils.now()
replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0]
if is_sub_stream:
# As this function expecting stream name as parent name hence changing values
stream_name = PARENT_STREAM_MAP.get(stream_name)
replication_key = STREAM_REPLICATION_KEY.get(stream_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 Optimization and Readability:

if is_sub_stream:
    sub_stream_name = stream_name
    stream_name = PARENT_STREAM_MAP.get(stream_name)

replication_key = STREAM_REPLICATION_KEY.get(stream_name)

You can remove the code at L578

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Updated the code
  • Also We cannot remove the L578 as if both parent and child would be selected, we'll need the sub_stream_name for fetching the child records.

else:
# replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0]
replication_key = STREAM_REPLICATION_KEY.get(stream_name)

# Invoice Items bookmarks on `date`, but queries on `created`
filter_key = 'created' if stream_name == 'invoice_items' else replication_key

sub_stream_name = SUB_STREAMS.get(stream_name)

# Get bookmark for the stream
stream_bookmark = get_bookmark_for_stream(stream_name, replication_key)

bookmark = stream_bookmark

# If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark)
should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name)

if should_sync_sub_stream:
sub_stream_bookmark = get_bookmark_for_sub_stream(sub_stream_name)

if not is_sub_stream:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
bookmark = min(stream_bookmark, sub_stream_bookmark)
else:
sub_stream_bookmark = None

with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer:
end_time = dt_to_epoch(utils.now())

Expand Down Expand Up @@ -621,7 +650,7 @@ def sync_stream(stream_name):
rec['updated'] = stream_obj_created

# sync stream if object is greater than or equal to the bookmark
if stream_obj_created >= stream_bookmark:
if stream_obj_created >= stream_bookmark and not is_sub_stream:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
rec = transformer.transform(rec,
Context.get_catalog_entry(stream_name)['schema'],
stream_metadata)
Expand All @@ -638,12 +667,21 @@ def sync_stream(stream_name):

Context.new_counts[stream_name] += 1

# sync sub streams if its selected and the parent object
# is greater than its bookmark
if should_sync_sub_stream and stream_obj_created > sub_stream_bookmark:
sync_sub_stream(sub_stream_name, stream_obj)

# Update stream/sub-streams bookmarks as stop window
if stop_window > stream_bookmark:
if not is_sub_stream and stop_window > stream_bookmark:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
stream_bookmark = stop_window
# Write bookmark for the stream
write_bookmark_for_stream(stream_name, replication_key, stream_bookmark)

# the sub stream bookmarks on its parent
if should_sync_sub_stream and stop_window > sub_stream_bookmark:
namrata270998 marked this conversation as resolved.
Show resolved Hide resolved
sub_stream_bookmark = stop_window
write_bookmark_for_stream(sub_stream_name, replication_key, sub_stream_bookmark)

singer.write_state(Context.state)

# update window for next iteration
Expand Down Expand Up @@ -677,7 +715,14 @@ def get_object_list_iterator(object_list):
# we are in a cycle.
INITIAL_SUB_STREAM_OBJECT_LIST_LENGTH = 10

def write_substream_records(sub_stream_name, parent_obj, updates=False):
def is_parent_selected(sub_stream_name):
"""
Given a child stream, check if the parent is selected.
"""
parent_stream = PARENT_STREAM_MAP.get(sub_stream_name)
return Context.is_selected(parent_stream)

def sync_sub_stream(sub_stream_name, parent_obj, updates=False):
"""
Given a parent object, retrieve its values for the specified substream.
"""
Expand Down Expand Up @@ -820,52 +865,6 @@ def write_substream_records(sub_stream_name, parent_obj, updates=False):
Context.new_counts[sub_stream_name] += 1


def sync_sub_stream(child_stream, bookmark_value):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous branch of fix_parent_child_relationship is already merged into crest-master, which had a different code. However, this function has a different code and is present on L724.

"""
Get the parent records based on the bookmark and corresponding child stream records
"""
# Get the parent stream of the stream
parent_stream = PARENT_STREAM_MAP[child_stream]
# Get the replication key for the stream
parent_replication_key = STREAM_REPLICATION_KEY[parent_stream]

end_time = dt_to_epoch(utils.now())

window_size = float(Context.config.get('date_window_size', DEFAULT_DATE_WINDOW_SIZE))

if DEFAULT_DATE_WINDOW_SIZE != window_size:
LOGGER.info('Using non-default date window size of %.2f',window_size)
start_window = bookmark_value

# NB: We observed records coming through newest->oldest and so
# date-windowing was added and the tap only bookmarks after it has
# gotten through a date window
while start_window < end_time:
stop_window = dt_to_epoch(epoch_to_dt(start_window) + timedelta(days=window_size))
# cut off the last window at the end time
if stop_window > end_time:
stop_window = end_time

# Get the parent records for the child-streams to loop on it and fetch the child records.
for parent_obj in paginate(
STREAM_SDK_OBJECTS[parent_stream]['sdk_object'],
parent_replication_key,
start_window,
stop_window,
parent_stream,
STREAM_SDK_OBJECTS[parent_stream].get('request_args')):
write_substream_records(child_stream, parent_obj)

# Update sub-streams bookmarks as stop window
if stop_window > bookmark_value:
bookmark_value = stop_window
# Write bookmark for the stream
write_bookmark_for_stream(child_stream, parent_replication_key, bookmark_value)
singer.write_state(Context.state)

# update window for next iteration
start_window = stop_window

def should_sync_event(events_obj, object_type, id_to_created_map):
"""Checks to ensure the event's underlying object has an id and that the id_to_created_map
contains an entry for that id. Returns true the first time an id should be added to the map
Expand Down Expand Up @@ -900,7 +899,7 @@ def recursive_to_dict(some_obj):
# Else just return
return some_obj

def sync_event_updates(stream_name, bookmark_value):
def sync_event_updates(stream_name, is_sub_stream):
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
'''
Get updates via events endpoint

Expand All @@ -910,10 +909,29 @@ def sync_event_updates(stream_name, bookmark_value):

date_window_size = 60 * 60 * 24 # Seconds in a day

sub_stream_name = None
if stream_name in PARENT_STREAM_MAP.keys():
if is_sub_stream:
sub_stream_name = stream_name
stream_name = PARENT_STREAM_MAP[stream_name]
stream_name = PARENT_STREAM_MAP.get(sub_stream_name)

sub_stream_name = SUB_STREAMS.get(stream_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 This line overwrites L918. What is the need of this line explicitly?
If this function is called by sub-stream, we assign value to this variable at L918. Else, we keep it None as default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not keep it None as default, as if both parent and child would be selected, we'll need the sub_stream_name for fetching the child records.

Copy link

@karanpanchal-crest karanpanchal-crest Jun 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. can you remove L919 as it would not have any use? It will always get overridden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the line as it will be overriden


parent_bookmark_value = singer.get_bookmark(Context.state,
stream_name + '_events',
'updates_created') or \
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

if sub_stream_name:
sub_stream_bookmark_value = parent_bookmark_value = singer.get_bookmark(Context.state,
sub_stream_name + '_events',
'updates_created') or \
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp())

if is_sub_stream:
dbshah1212 marked this conversation as resolved.
Show resolved Hide resolved
bookmark_value = sub_stream_bookmark_value
elif sub_stream_name and Context.is_selected(sub_stream_name):
bookmark_value = min(parent_bookmark_value, sub_stream_bookmark_value)
else:
bookmark_value = parent_bookmark_value

max_created = bookmark_value
date_window_start = max_created
Expand Down Expand Up @@ -980,7 +998,7 @@ def sync_event_updates(stream_name, bookmark_value):

if events_obj.created >= bookmark_value:
if rec.get('id') is not None:
if sub_stream_name is None:
if not is_sub_stream:
singer.write_record(stream_name,
rec,
time_extracted=extraction_time)
Expand All @@ -990,9 +1008,9 @@ def sync_event_updates(stream_name, bookmark_value):
if events_obj.get('type', '').endswith('.deleted'):
continue

if sub_stream_name:
if sub_stream_name and Context.is_selected(sub_stream_name):
if event_resource_obj:
write_substream_records(sub_stream_name,
sync_sub_stream(sub_stream_name,
event_resource_obj,
updates=True)
if events_obj.created > max_created:
Expand All @@ -1002,13 +1020,13 @@ def sync_event_updates(stream_name, bookmark_value):
# cannot bookmark until the entire page is processed
date_window_start = date_window_end
date_window_end = date_window_end + date_window_size
if sub_stream_name is None:
if not is_sub_stream:
singer.write_bookmark(Context.state,
stream_name + '_events',
'updates_created',
max_created)
singer.write_state(Context.state)
else:
if sub_stream_name and Context.is_selected(sub_stream_name):
singer.write_bookmark(Context.state,
sub_stream_name + '_events',
'updates_created',
Expand Down Expand Up @@ -1036,22 +1054,11 @@ def sync():
stream_name = catalog_entry['tap_stream_id']
# Sync records for stream
if Context.is_selected(stream_name):
if not Context.is_sub_stream(stream_name): # Run the sync for parent-streams
sync_stream(stream_name)
if not Context.is_sub_stream(stream_name) or not is_parent_selected(stream_name): # Run the sync for parent-streams
sync_stream(stream_name, Context.is_sub_stream(stream_name))
# This prevents us from retrieving 'events.events'
if STREAM_TO_TYPE_FILTER.get(stream_name):
bookmark_value = get_bookmark_for_stream(stream_name + '_events', 'updates_created')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previosuly in the older merged branch, we were fetching parent records twice when both parent and child were selected. Updated and optimized the code, hence removed this condition.

sync_event_updates(stream_name, bookmark_value)

else: # Run the sync for child-streams independently
bookmark_value = get_bookmark_for_sub_stream(stream_name)
sync_sub_stream(stream_name, bookmark_value)
# Get the child-stream's events bookmark if present
events_bookmark = singer.get_bookmark(Context.state, stream_name + '_events', 'updates_created')
# Use the child-stream's event bookmark if present, else use the original child-stream's bookmark
if events_bookmark:
bookmark_value = events_bookmark
sync_event_updates(stream_name, bookmark_value) # Run the sync mode for fetching events for the child-streams independently
sync_event_updates(stream_name, Context.is_sub_stream(stream_name))

@utils.handle_top_exception(LOGGER)
def main():
Expand Down
48 changes: 12 additions & 36 deletions tests/unittests/test_child_bookmarks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import unittest
from unittest import mock
from tap_stripe import DEFAULT_DATE_WINDOW_SIZE, Context, sync, stripe, dt_to_epoch, utils
from tap_stripe import Context, sync, stripe, dt_to_epoch, utils

class TestParentChildBookmarking(unittest.TestCase):
@mock.patch('tap_stripe.is_parent_selected', return_value=False)
@mock.patch('tap_stripe.paginate')
@mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True])
@mock.patch('tap_stripe.singer.write_schema')
Expand All @@ -11,7 +12,7 @@ class TestParentChildBookmarking(unittest.TestCase):
@mock.patch('tap_stripe.sync_event_updates')
@mock.patch('tap_stripe.Context.get_catalog_entry')
@mock.patch('tap_stripe.utils.now')
def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_updates, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate):
def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_updates, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected):
'''
Verify that state is updated with parent's bookmark after syncing the child.
'''
Expand All @@ -23,13 +24,14 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_up
mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]}
# metadata.to_map return value
mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}}
invoice_line_items_ts = 1641137533
Context.state = {"bookmarks": {"invoices": {"date": 1645716195}, "invoice_line_items": {"date": invoice_line_items_ts}}}
invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z
Context.state = {"bookmarks": {"invoices": {"date": 1641137533}, "invoice_line_items": {"date": invoice_line_items_ts}}}
sync()
stop_window = dt_to_epoch(now_time)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@namrata270998 is there a reason for explicitly setting stop_window? I think we can expect that value of stop_window should be start_date + 30 days by default. This way we will check the window logic as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we keep the stop window 30 days, still the loop will execute one time, and as we just want to check that the bookmark is used properly we can keep any stop window, it will not affect anywhere in this testcase

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. However, it would be good if we check the routine scenarios in the test case rather than mocking it unneccesarily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the stop window to take 30 days time window

# Verify that the paginate function is called with the child stream bookmark
mock_paginate.assert_called_with(stripe.Invoice, 'created', invoice_line_items_ts, stop_window, 'invoices', None)

@mock.patch('tap_stripe.is_parent_selected', return_value=False)
@mock.patch('tap_stripe.paginate')
@mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True])
@mock.patch('tap_stripe.singer.write_schema')
Expand All @@ -38,10 +40,11 @@ def test_child_bookmarking(self, mock_now, mock_get_catalog_entry, mock_event_up
@mock.patch('tap_stripe.Context.get_catalog_entry')
@mock.patch('tap_stripe.utils.now')
@mock.patch('tap_stripe.sync_sub_stream')
@mock.patch('tap_stripe.sync_stream')
@mock.patch('tap_stripe.sync_event_updates')
def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_updates, sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate):
def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_updates, mock_sync_stream, mock_sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate, mock_is_parent_selected):
'''
Verify that state is updated with parent's bookmark after syncing the child.
Verify that event_updates is called for child stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we not check this case in the above function itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above test case we checked for child bookmarking, and in the current we are checking for events bookmarks. Also we need to mock different things in both of them and the scenarios are different, hence kept them separate

'''
# mocked now time
now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00')
Expand All @@ -51,36 +54,9 @@ def test_sync_event_updates_when_events_bookmark_present(self, mock_sync_event_u
mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]}
# metadata.to_map return value
mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}}
invoice_line_items_ts = 1641137533
invoice_line_items_ts = 1641137533 # 02-01-2022T03:32:13Z
events_ts = 1645716195
Context.state = {"bookmarks": {"invoice_line_items_events": {"updates_created": events_ts}, "invoice_line_items": {"date": invoice_line_items_ts}}}
sync()
# Verify that the sync_event_updates function is called with the events bookmark
mock_sync_event_updates.assert_called_with('invoice_line_items', events_ts)

@mock.patch('tap_stripe.paginate')
@mock.patch('tap_stripe.Context.is_sub_stream', return_value=[True])
@mock.patch('tap_stripe.singer.write_schema')
@mock.patch('tap_stripe.Context.is_selected', return_value=[True])
@mock.patch('tap_stripe.metadata.to_map')
@mock.patch('tap_stripe.Context.get_catalog_entry')
@mock.patch('tap_stripe.utils.now')
@mock.patch('tap_stripe.sync_sub_stream')
@mock.patch('tap_stripe.sync_event_updates')
def test_sync_event_updates_when_events_bookmark_not_present(self, mock_sync_event_updates, sync_sub_stream, mock_now, mock_get_catalog_entry, mock_to_map, mock_is_selected, mock_write_schema, mock_is_sub_stream, mock_paginate):
'''
Verify that state is updated with parent's bookmark after syncing the child.
'''
# mocked now time
now_time = utils.strptime_with_tz('2022-01-31 16:17:40.948019+00:00')
mock_now.return_value = now_time
# catalog passed in the context
Context.catalog = {'streams': [{'tap_stream_id': 'invoice_line_items', 'schema': {}, 'key_properties': [], 'metadata': []}]}
mock_get_catalog_entry.return_value = {'tap_stream_id': 'invoices', 'schema': {}, 'key_properties': [], 'metadata': [{"valid-replication-keys": ["created"]}]}
# metadata.to_map return value
mock_to_map.return_value = {(): {'table-key-properties': ['id'], 'selected': True, 'forced-replication-method': 'INCREMENTAL', 'valid-replication-keys': ['created']}}
invoice_line_items_ts = 1641137533
Context.state = {"bookmarks": {"invoice_line_items": {"date": invoice_line_items_ts}}}
sync()
# Verify that the sync_event_updates function is called with the child stream bookmark
mock_sync_event_updates.assert_called_with('invoice_line_items', invoice_line_items_ts)
# Verify that the sync_event_updates function is called with the is_subStream parameter True
mock_sync_event_updates.assert_called_with('invoice_line_items', [True])
Loading