Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add whitelisting #22

Merged
merged 5 commits into from
Apr 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions tap_stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,57 @@ def discover():

return {'streams': streams}

def value_at_breadcrumb(breadcrumb, rec):
if len(breadcrumb) == 1:
return rec.get(breadcrumb[0])
if rec.get(breadcrumb[0]):
return value_at_breadcrumb(breadcrumb[1:], rec[breadcrumb[0]])
return None


def insert_at_breadcrumb(breadcrumb, value, rec):
if len(breadcrumb) == 1:
rec[breadcrumb[0]] = value
else:
if rec.get(breadcrumb[0]):
insert_at_breadcrumb(breadcrumb[1:], value, rec[breadcrumb[0]])
else:
rec[breadcrumb[0]] = {}
insert_at_breadcrumb(breadcrumb[1:], value, rec[breadcrumb[0]])


def apply_whitelist(rec, stream_field_whitelist):
"""The whitelist is a map from stream_name to a list of breadcrumbs that
indicates which nested fields should be persisted. There shouldn't be
any top-level fields in the whitelist since users can already remove
these fields via field selection. The shape of the whitelist is:
{
<stream_name>: [
[<field_1_breadcrumb>],
[<field_2_breadcrumb>],
...
[<field_n_breadcrumb>]
]
}
For now, the top level field should always be 'data' until we hear of
a need to extend this to other deeply nested objects
"""
filtered_rec = {}

# Keep all the top level fields
for k, v in rec.items(): #pylint: disable=invalid-name
if not isinstance(v, (dict, list)):
filtered_rec[k] = v

for breadcrumb in stream_field_whitelist:
assert len(breadcrumb) > 1
assert breadcrumb[0] == 'data'
value_to_add = value_at_breadcrumb(breadcrumb, rec)
if value_to_add:
insert_at_breadcrumb(breadcrumb, value_to_add, filtered_rec)
return filtered_rec


def reduce_foreign_keys(rec, stream_name):
if stream_name == 'customers':
rec['subscriptions'] = [s['id'] for s in rec.get('subscriptions', [])] or None
Expand All @@ -301,13 +352,16 @@ def reduce_foreign_keys(rec, stream_name):
rec['lines'][k] = [li.to_dict_recursive() for li in val]
return rec

# pylint: disable=too-many-locals
def sync_stream(stream_name):
"""
Sync each stream, looking for newly created records. Updates are captured by events stream.
"""
LOGGER.info("Started syncing stream %s", stream_name)

stream_metadata = metadata.to_map(Context.get_catalog_entry(stream_name)['metadata'])
stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name)

extraction_time = singer.utils.now()
replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0]
# Invoice Items bookmarks on `date`, but queries on `created`
Expand Down Expand Up @@ -361,6 +415,12 @@ def sync_stream(stream_name):
Context.get_catalog_entry(stream_name)['schema'],
stream_metadata)

# At this point, the record has been transformed and so
# any de-selected fields have been pruned. Now, prune off
# any fields that aren't present in the whitelist.
if stream_field_whitelist:
rec = apply_whitelist(rec, stream_field_whitelist)

singer.write_record(stream_name,
rec,
time_extracted=extraction_time)
Expand Down