-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-14058: Update SDK and api version #105
Changes from 24 commits
bfb9aaf
e943539
4f8540e
9f7f9bb
5746fc9
0f84a64
9f56ba2
f23a0c3
ab4af98
566afcc
dc137c7
23b02f0
016c0c4
cd40d52
a284115
e797ca9
d676147
f005a9e
84b4095
d111788
befe058
98d9d68
6895c02
6d4dfcc
260109b
8789910
5b4d8f6
ad1e0ee
6f6cd9f
098032b
7419e7a
a0a3924
1223197
f6b96fc
44abdf8
95a37d9
e2a6fbc
f26ab5c
23c49c8
2ce41eb
f013574
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -46,7 +46,7 @@ | |||||||
'events': 'created', | ||||||||
'customers': 'created', | ||||||||
'plans': 'created', | ||||||||
'invoices': 'date', | ||||||||
'invoices': 'created', | ||||||||
'invoice_items': 'date', | ||||||||
'transfers': 'created', | ||||||||
'coupons': 'created', | ||||||||
|
@@ -80,6 +80,17 @@ | |||||||
# payouts - these are called transfers with an event type of payout.* | ||||||||
} | ||||||||
|
||||||||
# Some fields are not available by default with latest API version so | ||||||||
# retrive it by passing expand paramater in SDK object | ||||||||
STREAM_TO_EXPAND_FIELDS = { | ||||||||
'customers': ['data.sources', 'data.subscriptions'], | ||||||||
'plans': ['data.tiers'], | ||||||||
'invoice_items': ['data.plan.tiers'], | ||||||||
'invoice_line_items': ['data.plan.tiers'], | ||||||||
'subscriptions': ['data.plan.tiers'], | ||||||||
'subscription_items': ['data.plan.tiers'] | ||||||||
} | ||||||||
|
||||||||
SUB_STREAMS = { | ||||||||
'subscriptions': 'subscription_items', | ||||||||
'invoices': 'invoice_line_items', | ||||||||
|
@@ -162,7 +173,7 @@ def configure_stripe_client(): | |||||||
# https://github.com/stripe/stripe-python/tree/a9a8d754b73ad47bdece6ac4b4850822fa19db4e#usage | ||||||||
stripe.api_key = Context.config.get('client_secret') | ||||||||
# Override the Stripe API Version for consistent access | ||||||||
stripe.api_version = '2018-09-24' | ||||||||
stripe.api_version = '2020-08-27' | ||||||||
# Allow ourselves to retry retriable network errors 5 times | ||||||||
# https://github.com/stripe/stripe-python/tree/a9a8d754b73ad47bdece6ac4b4850822fa19db4e#configuring-automatic-retries | ||||||||
stripe.max_network_retries = 15 | ||||||||
|
@@ -177,7 +188,7 @@ def configure_stripe_client(): | |||||||
account = stripe.Account.retrieve(Context.config.get('account_id')) | ||||||||
msg = "Successfully connected to Stripe Account with display name" \ | ||||||||
+ " `%s`" | ||||||||
LOGGER.info(msg, account.display_name) | ||||||||
LOGGER.info(msg, account.settings.dashboard.display_name) | ||||||||
|
||||||||
def unwrap_data_objects(rec): | ||||||||
""" | ||||||||
|
@@ -371,10 +382,11 @@ def reduce_foreign_keys(rec, stream_name): | |||||||
return rec | ||||||||
|
||||||||
|
||||||||
def paginate(sdk_obj, filter_key, start_date, end_date, limit=100): | ||||||||
def paginate(sdk_obj, filter_key, start_date, end_date, stream_name, limit=100): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain why you have to include the stream_name as a new parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tap-stripe/tap_stripe/__init__.py Lines 389 to 391 in 1223197
|
||||||||
yield from sdk_obj.list( | ||||||||
limit=limit, | ||||||||
stripe_account=Context.config.get('account_id'), | ||||||||
expand=STREAM_TO_EXPAND_FIELDS.get(stream_name, []), | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain this code change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added code comment for the change. |
||||||||
# None passed to starting_after appears to retrieve | ||||||||
# all of them so this should always be safe. | ||||||||
**{filter_key + "[gte]": start_date, | ||||||||
|
@@ -391,6 +403,7 @@ def epoch_to_dt(epoch_ts): | |||||||
return datetime.fromtimestamp(epoch_ts) | ||||||||
|
||||||||
# pylint: disable=too-many-locals | ||||||||
# pylint: disable=too-many-statements | ||||||||
def sync_stream(stream_name): | ||||||||
""" | ||||||||
Sync each stream, looking for newly created records. Updates are captured by events stream. | ||||||||
|
@@ -404,8 +417,16 @@ def sync_stream(stream_name): | |||||||
replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] | ||||||||
# Invoice Items bookmarks on `date`, but queries on `created` | ||||||||
filter_key = 'created' if stream_name == 'invoice_items' else replication_key | ||||||||
stream_bookmark = singer.get_bookmark(Context.state, stream_name, replication_key) or \ | ||||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
|
||||||||
# Invoice was bookmarking on `date` but in latest API version, that field is deprecated and replication key changed to `created` | ||||||||
# kept `date` in bookmarking as it as to respect bookmark of active connection too | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed typo. |
||||||||
if stream_name == 'invoices': | ||||||||
stream_bookmark = singer.get_bookmark(Context.state, stream_name, 'date') or \ | ||||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
else: | ||||||||
stream_bookmark = singer.get_bookmark(Context.state, stream_name, replication_key) or \ | ||||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
|
||||||||
bookmark = stream_bookmark | ||||||||
|
||||||||
# if this stream has a sub_stream, compare the bookmark | ||||||||
|
@@ -414,8 +435,16 @@ def sync_stream(stream_name): | |||||||
# If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark) | ||||||||
should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name) | ||||||||
if should_sync_sub_stream: | ||||||||
sub_stream_bookmark = singer.get_bookmark(Context.state, sub_stream_name, replication_key) \ | ||||||||
or int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
|
||||||||
# Invoices's replication key changed from `date` to `created` in latest API version. | ||||||||
# Invoice line Items write bookmark with Invoice's replication key but it changed to `created` | ||||||||
# so kept `date` in bookmarking as it as to respect bookmark of active connection too. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed typo. |
||||||||
if sub_stream_name == "invoice_line_items": | ||||||||
sub_stream_bookmark = singer.get_bookmark(Context.state, sub_stream_name, 'date') \ | ||||||||
or int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
else: | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the comment it is mentioned as Created, but in the code it is start_date. Looks the similar code is replicated. Can we write a function which accept these as parameters and set the appropriate bookmark? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I added |
||||||||
sub_stream_bookmark = singer.get_bookmark(Context.state, sub_stream_name, replication_key) \ | ||||||||
or int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
|
||||||||
# if there is a sub stream, set bookmark to sub stream's bookmark | ||||||||
# since we know it must be earlier than the stream's bookmark | ||||||||
|
@@ -454,7 +483,7 @@ def sync_stream(stream_name): | |||||||
stop_window = end_time | ||||||||
|
||||||||
for stream_obj in paginate(STREAM_SDK_OBJECTS[stream_name]['sdk_object'], | ||||||||
filter_key, start_window, stop_window): | ||||||||
filter_key, start_window, stop_window, stream_name): | ||||||||
|
||||||||
# get the replication key value from the object | ||||||||
rec = unwrap_data_objects(stream_obj.to_dict_recursive()) | ||||||||
|
@@ -488,18 +517,37 @@ def sync_stream(stream_name): | |||||||
# Update stream/sub-streams bookmarks as stop window | ||||||||
if stop_window > stream_bookmark: | ||||||||
stream_bookmark = stop_window | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name, | ||||||||
replication_key, | ||||||||
stream_bookmark) | ||||||||
# Invoice was bookmarking on `date` but in latest API version, | ||||||||
# that field is deprecated and replication key changed to `created` | ||||||||
# kept `date` in bookmarking as it as to respect bookmark of active connection too. | ||||||||
if stream_name == "invoices": | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name, | ||||||||
'date', | ||||||||
stream_bookmark) | ||||||||
else: | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name, | ||||||||
replication_key, | ||||||||
stream_bookmark) | ||||||||
|
||||||||
# the sub stream bookmarks on its parent | ||||||||
if should_sync_sub_stream and stop_window > sub_stream_bookmark: | ||||||||
sub_stream_bookmark = stop_window | ||||||||
singer.write_bookmark(Context.state, | ||||||||
sub_stream_name, | ||||||||
replication_key, | ||||||||
sub_stream_bookmark) | ||||||||
|
||||||||
# Invoices's replication key changed from `date` to `created` in latest API version. | ||||||||
# Invoice line Items write bookmark with Invoice's replication key but it changed to `created` | ||||||||
# so kept `date` in bookmarking as it as to respect bookmark of active connection too. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed typo. |
||||||||
if sub_stream_name == "invoice_line_items": | ||||||||
singer.write_bookmark(Context.state, | ||||||||
sub_stream_name, | ||||||||
'date', | ||||||||
sub_stream_bookmark) | ||||||||
else: | ||||||||
singer.write_bookmark(Context.state, | ||||||||
sub_stream_name, | ||||||||
replication_key, | ||||||||
sub_stream_bookmark) | ||||||||
|
||||||||
singer.write_state(Context.state) | ||||||||
|
||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
import decimal | ||
from datetime import datetime as dt | ||
from datetime import timezone as tz | ||
from dateutil import parser | ||
|
||
from tap_tester import connections, menagerie, runner | ||
|
||
|
@@ -79,12 +80,7 @@ def expected_metadata(self): | |
'events': default, | ||
'customers': default, | ||
'plans': default, | ||
'invoices': { | ||
self.AUTOMATIC_FIELDS: {"updated"}, | ||
self.REPLICATION_KEYS: {"date"}, | ||
self.PRIMARY_KEYS: {"id"}, | ||
self.REPLICATION_METHOD: self.INCREMENTAL, | ||
}, | ||
'invoices': default, | ||
'invoice_items': { | ||
self.AUTOMATIC_FIELDS: {"updated"}, | ||
self.REPLICATION_KEYS: {"date"}, | ||
|
@@ -314,15 +310,31 @@ def split_records_into_created_and_updated(self, records): | |
'schema': batch['schema'], | ||
'key_names' : batch.get('key_names'), | ||
'table_version': batch.get('table_version')} | ||
created[stream]['messages'] += [m for m in batch['messages'] | ||
if m['data'].get("updated") == m['data'].get(bookmark_key)] | ||
# Bookmark key changed for `invoices` from `date` to `created` due to latest API change | ||
# but for `invoices` stream, the `created` field have integer type(epoch format) from starting so | ||
# converting `updated` to epoch for comparison. | ||
if stream == "invoices": | ||
created[stream]['messages'] += [m for m in batch['messages'] | ||
if self.dt_to_ts(m['data'].get("updated")) == m['data'].get(bookmark_key)] | ||
else: | ||
created[stream]['messages'] += [m for m in batch['messages'] | ||
if m['data'].get("updated") == m['data'].get(bookmark_key)] | ||
|
||
Comment on lines
+313
to
+322
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is a best practice for bookmarking. Consistency in the data types for bookmarks is what we want. This may be worth discussing with a dev, if it's unclear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bookmark is written in epoch format for all streams like below in STATE. We did not change the data-type to avoid column split because the created field was already there with an integer type. and handled that in a test case. |
||
if stream not in updated: | ||
updated[stream] = {'messages': [], | ||
'schema': batch['schema'], | ||
'key_names' : batch.get('key_names'), | ||
'table_version': batch.get('table_version')} | ||
updated[stream]['messages'] += [m for m in batch['messages'] | ||
if m['data'].get("updated") != m['data'].get(bookmark_key)] | ||
|
||
# Bookmark key changed for `invoices` from `date` to `created` due to latest API change | ||
# but for `invoices` stream, the `created` field have integer type(epoch format) from starting so | ||
# converting `updated` to epoch for comparison. | ||
if stream == "invoices": | ||
updated[stream]['messages'] += [m for m in batch['messages'] | ||
if self.dt_to_ts(m['data'].get("updated")) != m['data'].get(bookmark_key)] | ||
else: | ||
updated[stream]['messages'] += [m for m in batch['messages'] | ||
if m['data'].get("updated") != m['data'].get(bookmark_key)] | ||
Comment on lines
+329
to
+337
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm little bit confused here. In the previous section we are converting updated to integer type to compare with created which made sense. Why are we here doing the comparison with Updated and Updated? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are doing similar things in the previous section and above section. The only difference is in the previous section we are checking with the |
||
return created, updated | ||
|
||
def select_all_streams_and_fields(self, conn_id, catalogs, select_all_fields: bool = True, exclude_streams=None): | ||
|
@@ -523,3 +535,7 @@ def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | ||
self.start_date = self.get_properties().get('start_date') | ||
self.maxDiff=None | ||
|
||
|
||
def dt_to_ts(self, dtime): | ||
return parser.parse(dtime).timestamp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be 'retrieve'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed typo.