-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-14058: Update SDK and api version #105
Changes from all commits
bfb9aaf
e943539
4f8540e
9f7f9bb
5746fc9
0f84a64
9f56ba2
f23a0c3
ab4af98
566afcc
dc137c7
23b02f0
016c0c4
cd40d52
a284115
e797ca9
d676147
f005a9e
84b4095
d111788
befe058
98d9d68
6895c02
6d4dfcc
260109b
8789910
5b4d8f6
ad1e0ee
6f6cd9f
098032b
7419e7a
a0a3924
1223197
f6b96fc
44abdf8
95a37d9
e2a6fbc
f26ab5c
23c49c8
2ce41eb
f013574
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -8,6 +8,7 @@ | |||||||
import stripe | ||||||||
import stripe.error | ||||||||
from stripe.stripe_object import StripeObject | ||||||||
from stripe.util import convert_to_stripe_object | ||||||||
import singer | ||||||||
from singer import utils, Transformer, metrics | ||||||||
from singer import metadata | ||||||||
|
@@ -46,7 +47,7 @@ | |||||||
'events': 'created', | ||||||||
'customers': 'created', | ||||||||
'plans': 'created', | ||||||||
'invoices': 'date', | ||||||||
'invoices': 'created', | ||||||||
'invoice_items': 'date', | ||||||||
'transfers': 'created', | ||||||||
'coupons': 'created', | ||||||||
|
@@ -80,6 +81,20 @@ | |||||||
# payouts - these are called transfers with an event type of payout.* | ||||||||
} | ||||||||
|
||||||||
# Some fields are not available by default with latest API version so | ||||||||
# retrieve it by passing expand paramater in SDK object | ||||||||
STREAM_TO_EXPAND_FIELDS = { | ||||||||
# `tax_ids` field is not included in API response by default. To include it in the response, pass it in expand paramater. | ||||||||
# Reference: https://stripe.com/docs/api/customers/object#customer_object-tax_ids | ||||||||
|
||||||||
'customers': ['data.sources', 'data.subscriptions', 'data.tax_ids'], | ||||||||
'plans': ['data.tiers'], | ||||||||
'invoice_items': ['data.plan.tiers'], | ||||||||
'invoice_line_items': ['data.plan.tiers'], | ||||||||
'subscriptions': ['data.plan.tiers'], | ||||||||
'subscription_items': ['data.plan.tiers'] | ||||||||
} | ||||||||
|
||||||||
SUB_STREAMS = { | ||||||||
'subscriptions': 'subscription_items', | ||||||||
'invoices': 'invoice_line_items', | ||||||||
|
@@ -162,7 +177,7 @@ def configure_stripe_client(): | |||||||
# https://github.com/stripe/stripe-python/tree/a9a8d754b73ad47bdece6ac4b4850822fa19db4e#usage | ||||||||
stripe.api_key = Context.config.get('client_secret') | ||||||||
# Override the Stripe API Version for consistent access | ||||||||
stripe.api_version = '2018-09-24' | ||||||||
stripe.api_version = '2020-08-27' | ||||||||
# Allow ourselves to retry retriable network errors 5 times | ||||||||
# https://github.com/stripe/stripe-python/tree/a9a8d754b73ad47bdece6ac4b4850822fa19db4e#configuring-automatic-retries | ||||||||
stripe.max_network_retries = 15 | ||||||||
|
@@ -177,7 +192,7 @@ def configure_stripe_client(): | |||||||
account = stripe.Account.retrieve(Context.config.get('account_id')) | ||||||||
msg = "Successfully connected to Stripe Account with display name" \ | ||||||||
+ " `%s`" | ||||||||
LOGGER.info(msg, account.display_name) | ||||||||
LOGGER.info(msg, account.settings.dashboard.display_name) | ||||||||
|
||||||||
def unwrap_data_objects(rec): | ||||||||
""" | ||||||||
|
@@ -371,10 +386,13 @@ def reduce_foreign_keys(rec, stream_name): | |||||||
return rec | ||||||||
|
||||||||
|
||||||||
def paginate(sdk_obj, filter_key, start_date, end_date, limit=100): | ||||||||
def paginate(sdk_obj, filter_key, start_date, end_date, stream_name, limit=100): | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain why you have to include the stream_name as a new parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tap-stripe/tap_stripe/__init__.py Lines 389 to 391 in 1223197
|
||||||||
yield from sdk_obj.list( | ||||||||
limit=limit, | ||||||||
stripe_account=Context.config.get('account_id'), | ||||||||
# Some fields are not available by default with latest API version so | ||||||||
# retrieve it by passing expand paramater in SDK object | ||||||||
expand=STREAM_TO_EXPAND_FIELDS.get(stream_name, []), | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain this code change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added code comment for the change. |
||||||||
# None passed to starting_after appears to retrieve | ||||||||
# all of them so this should always be safe. | ||||||||
**{filter_key + "[gte]": start_date, | ||||||||
|
@@ -390,7 +408,60 @@ def dt_to_epoch(dt): | |||||||
def epoch_to_dt(epoch_ts): | ||||||||
return datetime.fromtimestamp(epoch_ts) | ||||||||
|
||||||||
def get_bookmark_for_stream(stream_name, replication_key): | ||||||||
""" | ||||||||
Returns bookmark for the streams from the state if found otherwise start_date | ||||||||
""" | ||||||||
# Invoices's replication key changed from `date` to `created` in latest API version. | ||||||||
# Invoice line Items write bookmark with Invoice's replication key but it changed to `created` | ||||||||
# so kept `date` in bookmarking for Invoices and Invoice line Items as it has to respect bookmark of active connection too. | ||||||||
if stream_name in ['invoices', 'invoice_line_items']: | ||||||||
stream_bookmark = singer.get_bookmark(Context.state, stream_name, 'date') or \ | ||||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
else: | ||||||||
stream_bookmark = singer.get_bookmark(Context.state, stream_name, replication_key) or \ | ||||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
return stream_bookmark | ||||||||
|
||||||||
def write_bookmark_for_stream(stream_name, replication_key, stream_bookmark): | ||||||||
""" | ||||||||
Write bookmark for the streams using replication key and bookmark value | ||||||||
""" | ||||||||
# Invoices's replication key changed from `date` to `created` in latest API version. | ||||||||
# Invoice line Items write bookmark with Invoice's replication key but it changed to `created` | ||||||||
# so kept `date` in bookmarking for Invoices and Invoice line Items as it has to respect bookmark of active connection too. | ||||||||
if stream_name in ['invoices', 'invoice_line_items']: | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name, | ||||||||
'date', | ||||||||
stream_bookmark) | ||||||||
else: | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name, | ||||||||
replication_key, | ||||||||
stream_bookmark) | ||||||||
|
||||||||
def convert_dict_to_stripe_object(record): | ||||||||
""" | ||||||||
Convert field datatype of dict object to `stripe.stripe_object.StripeObject`. | ||||||||
Example: | ||||||||
record = {'id': 'dummy_id', 'tiers': [{"flat_amount": 4578"unit_amount": 7241350}]} | ||||||||
This function convert datatype of each record of 'tiers' field to `stripe.stripe_object.StripeObject`. | ||||||||
""" | ||||||||
# Loop through each fields of `record` object | ||||||||
for key, val in record.items(): | ||||||||
# Check type of field | ||||||||
if isinstance(val, list): | ||||||||
# Loop through each records of list | ||||||||
for index, field_val in enumerate(val): | ||||||||
if isinstance(field_val, dict): | ||||||||
# Convert datatype of dict to `stripe.stripe_object.StripeObject` | ||||||||
record[key][index] = convert_to_stripe_object(record[key][index]) | ||||||||
|
||||||||
return record | ||||||||
|
||||||||
# pylint: disable=too-many-locals | ||||||||
# pylint: disable=too-many-statements | ||||||||
def sync_stream(stream_name): | ||||||||
""" | ||||||||
Sync each stream, looking for newly created records. Updates are captured by events stream. | ||||||||
|
@@ -404,8 +475,10 @@ def sync_stream(stream_name): | |||||||
replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] | ||||||||
# Invoice Items bookmarks on `date`, but queries on `created` | ||||||||
filter_key = 'created' if stream_name == 'invoice_items' else replication_key | ||||||||
stream_bookmark = singer.get_bookmark(Context.state, stream_name, replication_key) or \ | ||||||||
int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
|
||||||||
# Get bookmark for the stream | ||||||||
stream_bookmark = get_bookmark_for_stream(stream_name, replication_key) | ||||||||
|
||||||||
bookmark = stream_bookmark | ||||||||
|
||||||||
# if this stream has a sub_stream, compare the bookmark | ||||||||
|
@@ -414,8 +487,9 @@ def sync_stream(stream_name): | |||||||
# If there is a sub-stream and its selected, get its bookmark (or the start date if no bookmark) | ||||||||
should_sync_sub_stream = sub_stream_name and Context.is_selected(sub_stream_name) | ||||||||
if should_sync_sub_stream: | ||||||||
sub_stream_bookmark = singer.get_bookmark(Context.state, sub_stream_name, replication_key) \ | ||||||||
or int(utils.strptime_to_utc(Context.config["start_date"]).timestamp()) | ||||||||
|
||||||||
# Get bookmark for the sub_stream | ||||||||
sub_stream_bookmark = get_bookmark_for_stream(sub_stream_name, replication_key) | ||||||||
|
||||||||
# if there is a sub stream, set bookmark to sub stream's bookmark | ||||||||
# since we know it must be earlier than the stream's bookmark | ||||||||
|
@@ -454,10 +528,12 @@ def sync_stream(stream_name): | |||||||
stop_window = end_time | ||||||||
|
||||||||
for stream_obj in paginate(STREAM_SDK_OBJECTS[stream_name]['sdk_object'], | ||||||||
filter_key, start_window, stop_window): | ||||||||
filter_key, start_window, stop_window, stream_name): | ||||||||
|
||||||||
# get the replication key value from the object | ||||||||
rec = unwrap_data_objects(stream_obj.to_dict_recursive()) | ||||||||
# convert field datatype of dict object to `stripe.stripe_object.StripeObject` | ||||||||
rec = convert_dict_to_stripe_object(rec) | ||||||||
rec = reduce_foreign_keys(rec, stream_name) | ||||||||
stream_obj_created = rec[replication_key] | ||||||||
rec['updated'] = stream_obj_created | ||||||||
|
@@ -488,18 +564,15 @@ def sync_stream(stream_name): | |||||||
# Update stream/sub-streams bookmarks as stop window | ||||||||
if stop_window > stream_bookmark: | ||||||||
stream_bookmark = stop_window | ||||||||
singer.write_bookmark(Context.state, | ||||||||
stream_name, | ||||||||
replication_key, | ||||||||
stream_bookmark) | ||||||||
# Write bookmark for the stream | ||||||||
write_bookmark_for_stream(stream_name, replication_key, stream_bookmark) | ||||||||
|
||||||||
# the sub stream bookmarks on its parent | ||||||||
if should_sync_sub_stream and stop_window > sub_stream_bookmark: | ||||||||
sub_stream_bookmark = stop_window | ||||||||
singer.write_bookmark(Context.state, | ||||||||
sub_stream_name, | ||||||||
replication_key, | ||||||||
sub_stream_bookmark) | ||||||||
|
||||||||
# Write bookmark for the sub_stream | ||||||||
write_bookmark_for_stream(sub_stream_name, replication_key, sub_stream_bookmark) | ||||||||
|
||||||||
singer.write_state(Context.state) | ||||||||
|
||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the build_daily workflow definition to run the tests once a day. (Reference)