-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tdl 20674 Reduce API calls to stripe v2.0.0 #150
Changes from 14 commits
9656553
2f3cf1c
501aa09
aeccc93
eee6857
68c51c3
9a82e32
c2f07bf
91874ee
b169f88
e2a9d37
db5b0c1
972e379
fb1c1a5
65776a8
218b808
0bfb5a5
f630e4b
1534cfe
db5af57
32b3c7a
33b3ce7
54cc0fb
8780821
7b2a1d2
e3a407e
8b83fbb
6730d2b
002a553
c9fa844
bd99e09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -125,6 +125,7 @@ | |||||||
LOGGER = singer.get_logger() | ||||||||
|
||||||||
DEFAULT_DATE_WINDOW_SIZE = 30 #days | ||||||||
DEFAULT_EVENT_DATE_WINDOW_SIZE = 7 #days | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment to explain |
||||||||
|
||||||||
# default request timeout | ||||||||
REQUEST_TIMEOUT = 300 # 5 minutes | ||||||||
|
@@ -176,6 +177,8 @@ class Context(): | |||||||
stream_map = {} | ||||||||
new_counts = {} | ||||||||
updated_counts = {} | ||||||||
window_size = DEFAULT_DATE_WINDOW_SIZE # By default collect data of 30 days in one API call to collect newly created records | ||||||||
event_window_size = DEFAULT_EVENT_DATE_WINDOW_SIZE # By default collect data of 7 days in one API call for event_updates | ||||||||
|
||||||||
@classmethod | ||||||||
def get_catalog_entry(cls, stream_name): | ||||||||
|
@@ -485,12 +488,11 @@ def get_bookmark_for_stream(stream_name, replication_key): | |||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
return stream_bookmark | ||||||||
|
||||||||
def evaluate_start_time_based_on_lookback(stream_name, replication_key, lookback_window): | ||||||||
def evaluate_start_time_based_on_lookback(bookmark, lookback_window): | ||||||||
''' | ||||||||
For historical syncs take the start date as the starting point in a sync, even if it is more recent than | ||||||||
{today - lookback_window}. For incremental syncs, the tap should start syncing from {previous state - lookback_window} | ||||||||
''' | ||||||||
bookmark = singer.get_bookmark(Context.state, stream_name, replication_key) | ||||||||
start_date = int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
if bookmark: | ||||||||
lookback_evaluated_time = bookmark - lookback_window | ||||||||
|
@@ -599,7 +601,7 @@ def sync_stream(stream_name, is_sub_stream=False): | |||||||
with Transformer(singer.UNIX_SECONDS_INTEGER_DATETIME_PARSING) as transformer: | ||||||||
end_time = dt_to_epoch(utils.now()) | ||||||||
|
||||||||
window_size = float(Context.config.get('date_window_size', DEFAULT_DATE_WINDOW_SIZE)) | ||||||||
window_size = Context.window_size | ||||||||
|
||||||||
if DEFAULT_DATE_WINDOW_SIZE != window_size: | ||||||||
LOGGER.info('Using non-default date window size of %.2f',window_size) | ||||||||
|
@@ -610,6 +612,12 @@ def sync_stream(stream_name, is_sub_stream=False): | |||||||
# when they are available via the API, so these streams will need | ||||||||
# a short lookback window. | ||||||||
if stream_name in IMMUTABLE_STREAMS: | ||||||||
if stream_name == "events": | ||||||||
# Start sync from 30 days before if bookmark/start_date is older than 30 days. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated comment as per suggestion. |
||||||||
start_window = int(max(bookmark, (singer.utils.now() - timedelta(days=30)).timestamp())) | ||||||||
if start_window != bookmark: | ||||||||
LOGGER.warning("Provided current bookmark/start_date is older than the last 30 days. So, starting sync for the last 30 days as Stripe Event API returns data for the last 30 days only.") | ||||||||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @prijendev please split this line by max 120 characters There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Splitted out line into small lines for best practice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated comment as per suggestion. |
||||||||
|
||||||||
# pylint:disable=fixme | ||||||||
# TODO: This may be an issue for other streams' created_at | ||||||||
# entries, but to keep the surface small, doing this only for | ||||||||
|
@@ -622,7 +630,8 @@ def sync_stream(stream_name, is_sub_stream=False): | |||||||
lookback_window = IMMUTABLE_STREAM_LOOKBACK # default lookback | ||||||||
except ValueError: | ||||||||
raise ValueError('Please provide a valid integer value for the lookback_window parameter.') from None | ||||||||
start_window = evaluate_start_time_based_on_lookback(stream_name, replication_key, lookback_window) | ||||||||
if start_window != Context.config["start_date"]: | ||||||||
start_window = evaluate_start_time_based_on_lookback(start_window, lookback_window) | ||||||||
stream_bookmark = start_window | ||||||||
|
||||||||
# NB: We observed records coming through newest->oldest and so | ||||||||
|
@@ -911,8 +920,9 @@ def sync_event_updates(stream_name, is_sub_stream): | |||||||
or when called through only child stream i.e. when parent is not selected. | ||||||||
''' | ||||||||
LOGGER.info("Started syncing event based updates") | ||||||||
|
||||||||
date_window_size = 60 * 60 * 24 # Seconds in a day | ||||||||
event_window_size = Context.event_window_size | ||||||||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
events_date_window_size = int(60 * 60 * 24 * event_window_size) # event_window_size in seconds | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated variable name to |
||||||||
sync_start_time = dt_to_epoch(utils.now()) | ||||||||
|
||||||||
if is_sub_stream: | ||||||||
# We need to get the parent data first for syncing the child streams. Hence, | ||||||||
|
@@ -944,11 +954,13 @@ def sync_event_updates(stream_name, is_sub_stream): | |||||||
# Update the bookmark to parent bookmark value, if child is not selected | ||||||||
else: | ||||||||
bookmark_value = parent_bookmark_value | ||||||||
# Start sync from 30 days before if bookmark/start_date is older than 30 days. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add one space above the comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added space above the code comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated comment as per suggestion. |
||||||||
max_created = int(max(bookmark_value, (epoch_to_dt(sync_start_time) - timedelta(days=30)).timestamp())) | ||||||||
if max_created != bookmark_value: | ||||||||
LOGGER.warning("Provided current bookmark/start_date is older than the last 30 days. So, starting sync for the last 30 days as Stripe Event API returns data for the last 30 days only.") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated comment as per suggestion. |
||||||||
|
||||||||
max_created = bookmark_value | ||||||||
date_window_start = max_created | ||||||||
date_window_end = max_created + date_window_size | ||||||||
|
||||||||
date_window_end = max_created + events_date_window_size | ||||||||
stop_paging = False | ||||||||
|
||||||||
# Create a map to hold relate event object ids to timestamps | ||||||||
|
@@ -1033,25 +1045,36 @@ def sync_event_updates(stream_name, is_sub_stream): | |||||||
# The events stream returns results in descending order, so we | ||||||||
# cannot bookmark until the entire page is processed | ||||||||
date_window_start = date_window_end | ||||||||
date_window_end = date_window_end + date_window_size | ||||||||
date_window_end = date_window_end + events_date_window_size | ||||||||
|
||||||||
# Write the parent bookmark value only when the parent is selected | ||||||||
if not is_sub_stream: | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name + '_events', | ||||||||
'updates_created', | ||||||||
max_created) | ||||||||
singer.write_state(Context.state) | ||||||||
# Write bookmark for parent or child stream if it is selected | ||||||||
write_bookmark_for_event_updates(is_sub_stream, stream_name, sub_stream_name, max_created) | ||||||||
|
||||||||
max_created = max(max_created, sync_start_time - events_date_window_size) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add code comment and reason behind this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comment for this logic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment to explain the bookmark logic. |
||||||||
write_bookmark_for_event_updates(is_sub_stream, stream_name, sub_stream_name, max_created) | ||||||||
|
||||||||
# Write the child bookmark value only when the child is selected | ||||||||
if sub_stream_name and Context.is_selected(sub_stream_name): | ||||||||
singer.write_bookmark(Context.state, | ||||||||
sub_stream_name + '_events', | ||||||||
'updates_created', | ||||||||
max_created) | ||||||||
singer.write_state(Context.state) | ||||||||
singer.write_state(Context.state) | ||||||||
|
||||||||
def write_bookmark_for_event_updates(is_sub_stream, stream_name, sub_stream_name, max_created): | ||||||||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
""" | ||||||||
Write bookmark for parent and child streams. | ||||||||
""" | ||||||||
# Write the parent bookmark value only when the parent is selected | ||||||||
if not is_sub_stream: | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name + '_events', | ||||||||
'updates_created', | ||||||||
max_created) | ||||||||
singer.write_state(Context.state) | ||||||||
|
||||||||
# Write the child bookmark value only when the child is selected | ||||||||
if sub_stream_name and Context.is_selected(sub_stream_name): | ||||||||
singer.write_bookmark(Context.state, | ||||||||
sub_stream_name + '_events', | ||||||||
'updates_created', | ||||||||
max_created) | ||||||||
singer.write_state(Context.state) | ||||||||
|
||||||||
def sync(): | ||||||||
""" | ||||||||
The sync function called for the sync mode. | ||||||||
|
@@ -1078,6 +1101,26 @@ def sync(): | |||||||
if STREAM_TO_TYPE_FILTER.get(stream_name): | ||||||||
sync_event_updates(stream_name, Context.is_sub_stream(stream_name)) | ||||||||
|
||||||||
def get_date_window_size(param, default_value): | ||||||||
""" | ||||||||
Get date_window value from config, if the value is passed. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add doc string for arguments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added doc string for arguments. |
||||||||
Else return the default value. | ||||||||
""" | ||||||||
window_size = Context.config.get(param) | ||||||||
|
||||||||
# If window_size is not passed in the config then set it to the default(30 days) | ||||||||
if window_size is None: | ||||||||
return default_value | ||||||||
|
||||||||
# Return float of window_size which is passed in the config and is in the valid format of int, float or string. | ||||||||
if ((type(window_size) in [int, float]) or | ||||||||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
(isinstance(window_size, str) and window_size.replace('.', '', 1).isdigit())) and \ | ||||||||
float(window_size) > 0: | ||||||||
return float(window_size) | ||||||||
else: | ||||||||
# Raise Exception if window_size value is 0, "0" or invalid string. | ||||||||
raise Exception("The entered window size is invalid, it should be a valid none-zero integer.") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated exception message. |
||||||||
|
||||||||
@utils.handle_top_exception(LOGGER) | ||||||||
def main(): | ||||||||
# Parse command line arguments | ||||||||
|
@@ -1093,6 +1136,13 @@ def main(): | |||||||
print(json.dumps(catalog, indent=2)) | ||||||||
# Otherwise run in sync mode | ||||||||
else: | ||||||||
Context.window_size = get_date_window_size('date_window_size', DEFAULT_DATE_WINDOW_SIZE) | ||||||||
Context.event_window_size = get_date_window_size('event_date_window_size', DEFAULT_EVENT_DATE_WINDOW_SIZE) | ||||||||
# Reset event_window_size to 30 days if it is greater than 30 because Stripe Event API returns data of the last 30 days only. | ||||||||
if Context.event_window_size > 30: | ||||||||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
Context.event_window_size = 30 | ||||||||
LOGGER.warning("Using a default window size of 30 days as Stripe Event API returns data of the last 30 days only.") | ||||||||
|
||||||||
Context.tap_start = utils.now() | ||||||||
if args.catalog: | ||||||||
Context.catalog = args.catalog.to_dict() | ||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,6 +2,7 @@ | |||||
Test tap gets all updates for streams with updates published to the events stream | ||||||
""" | ||||||
import json | ||||||
from datetime import datetime, timedelta | ||||||
from time import sleep | ||||||
from random import random | ||||||
|
||||||
|
@@ -12,6 +13,55 @@ | |||||
get_catalogs, update_object, update_payment_intent, create_object, delete_object | ||||||
|
||||||
|
||||||
class TestEventUpdatesSyncStart(BaseTapTest): | ||||||
dbshah1212 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
Test for event records of streams, Even if start date is set before 30 days, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated comment as per suggestion. |
||||||
no record before 30 days will be received. | ||||||
""" | ||||||
@staticmethod | ||||||
def name(): | ||||||
return "tt_stripe_event_sync_start" | ||||||
|
||||||
def test_run(self): | ||||||
""" | ||||||
Verify that each record is from last 30 days. | ||||||
""" | ||||||
|
||||||
# Setting start_date to 32 days before today | ||||||
self.start_date = datetime.strftime(datetime.today() - timedelta(days=32), self.START_DATE_FORMAT) | ||||||
conn_id = connections.ensure_connection(self, original_properties=False) | ||||||
|
||||||
# AS it takes more than an hour to sync all the event_updates streams, | ||||||
# we are taking given two streams for sync | ||||||
event_update_streams = {"subscriptions", "customers"} | ||||||
|
||||||
found_catalogs = self.run_and_verify_check_mode(conn_id) | ||||||
our_catalogs = [catalog for catalog in found_catalogs | ||||||
if catalog.get('tap_stream_id') in | ||||||
event_update_streams] | ||||||
self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True) | ||||||
|
||||||
# Getting a date before 30 days of current date-time | ||||||
events_start_date = datetime.strftime(datetime.now() - timedelta(days=30), self.START_DATE_FORMAT) | ||||||
|
||||||
# Run a sync job using orchestrator | ||||||
self.run_and_verify_sync(conn_id) | ||||||
|
||||||
# Get the set of records from the sync | ||||||
synced_records = runner.get_records_from_target_output() | ||||||
|
||||||
for stream in event_update_streams: | ||||||
with self.subTest(stream=stream): | ||||||
|
||||||
# Get event-based records based on the newly added field `updated_by_event_type` | ||||||
events_records_data = [message['data'] for message in synced_records.get(stream).get('messages') | ||||||
if message['action'] == 'upsert' and | ||||||
message.get('data').get('updated_by_event_type', None)] | ||||||
|
||||||
for record in events_records_data: | ||||||
self.assertGreaterEqual(record.get('updated'), events_start_date) | ||||||
|
||||||
|
||||||
class EventUpdatesTest(BaseTapTest): | ||||||
""" | ||||||
Test tap gets all updates for streams with updates published to the events stream | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import unittest | ||
from parameterized import parameterized | ||
from tap_stripe import Context, get_date_window_size, DEFAULT_DATE_WINDOW_SIZE | ||
|
||
class TestGetWindowSize(unittest.TestCase): | ||
""" | ||
Test `get_date_window_size` method of the client. | ||
""" | ||
|
||
@parameterized.expand([ | ||
["integer_value", 10, 10.0], | ||
["float_value", 100.5, 100.5], | ||
["string_integer", "10", 10.0], | ||
["string_float", "100.5", 100.5], | ||
]) | ||
def test_window_size_values(self, name, date_window_size, expected_value): | ||
""" | ||
Test that for the valid value of window size, | ||
No exception is raised and the expected value is set. | ||
""" | ||
Context.config = {"date_window_size": date_window_size} | ||
|
||
# Verify window size value is expected | ||
self.assertEqual(get_date_window_size("date_window_size", DEFAULT_DATE_WINDOW_SIZE), expected_value) | ||
|
||
@parameterized.expand([ | ||
["integer_zero", 0], | ||
["float_zero", 0.0], | ||
["negative_value", -10], | ||
["string_zero", "0"], | ||
["string_float_zero", "0.0"], | ||
["string_negative_value", "-100"], | ||
["string_alphabate", "abc"], | ||
]) | ||
def test_invalid_value(self, name, date_window_size): | ||
""" | ||
Test that for invalid value exception is raised. | ||
""" | ||
Context.config = {"date_window_size": date_window_size} | ||
with self.assertRaises(Exception) as e: | ||
get_date_window_size("date_window_size", DEFAULT_DATE_WINDOW_SIZE) | ||
|
||
# Verify that the exception message is expected. | ||
self.assertEqual(str(e.exception), "The entered window size is invalid, it should be a valid none-zero integer.") | ||
|
||
def test_none_value(self): | ||
""" | ||
Test if no window size is not passed in the config, then set it to the default value. | ||
""" | ||
Context.config = {} | ||
|
||
# Verify that the default window size value is set. | ||
self.assertEqual(get_date_window_size("date_window_size", DEFAULT_DATE_WINDOW_SIZE), DEFAULT_DATE_WINDOW_SIZE) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import unittest | ||
from unittest import mock | ||
from datetime import datetime | ||
from tap_stripe import Context, sync_stream | ||
|
||
class MockClass(): | ||
'''The mock class for the Balance Transactions/events object.''' | ||
lines = "lines" | ||
def __init__(self): | ||
return None | ||
|
||
@classmethod | ||
def to_dict_recursive(cls): | ||
'''The mocked to_dict_recursive method of the Balance Transactions/Events class.''' | ||
return "Test Data" | ||
|
||
BOOKMARK_TIME = 1645046000 # epoch bookmark time | ||
BOOKMARK_FORMAT = "%Y-%m-%dT%H:%M:%SZ" | ||
|
||
@mock.patch("tap_stripe.LOGGER.warning") | ||
@mock.patch("singer.write_record") | ||
@mock.patch('singer.utils.now', return_value = datetime.strptime("2022-05-01T08:30:50Z", BOOKMARK_FORMAT)) | ||
@mock.patch("tap_stripe.reduce_foreign_keys", return_value = {"created": 16452804585}) | ||
@mock.patch("tap_stripe.convert_dict_to_stripe_object", return_value = {"created": "2022-02-17T00:00:00"}) | ||
@mock.patch("tap_stripe.paginate", return_value = [MockClass()]) | ||
@mock.patch("tap_stripe.Context.get_catalog_entry") | ||
@mock.patch("tap_stripe.singer.metadata.to_map") | ||
@mock.patch("tap_stripe.singer.metadata.get", return_value = ["created"]) | ||
@mock.patch("tap_stripe.epoch_to_dt") | ||
@mock.patch("tap_stripe.dt_to_epoch", side_effect = [1645056000, 1645056000, 1647647700, 1645056000]) # epoch timestamps | ||
@mock.patch("tap_stripe.sync_sub_stream") | ||
@mock.patch("tap_stripe.singer.get_bookmark", side_effect = [BOOKMARK_TIME, BOOKMARK_TIME]) | ||
class TestLoggerWarningForEvents(unittest.TestCase): | ||
|
||
def test_date_window_logger(self, mock_get_bookmark_for_stream, mock_sync_substream, mock_dt_to_epoch, mock_epoch_to_dt, mock_get, mock_metadata_map, | ||
mock_get_catalog_entry, mock_paginate, mock_convert_dict_to_stripe_object, mock_reduce_foreign_keys, | ||
mock_utils_now, mock_write_record, mock_logger): | ||
""" | ||
Test that tap prints expected warning message when bookmark value of before 30 days is passed in the state. | ||
""" | ||
config = {"client_secret": "test_secret", "account_id": "test_account", "start_date": "2022-02-17T00:00:00", "lookback_window": "0"} | ||
Context.config = config | ||
Context.new_counts['events'] = 1 | ||
sync_stream("events") | ||
|
||
# Verify warning message for bookmark of less than last 30 days. | ||
mock_logger.assert_called_with("Provided current bookmark/start_date is older than the last 30 days. So, starting sync for"\ | ||
" the last 30 days as Stripe Event API returns data for the last 30 days only.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment to explain
DEFAULT_DATE_WINDOW_SIZE