diff --git a/tap_stripe/__init__.py b/tap_stripe/__init__.py index 197952c6..a6f7cf10 100755 --- a/tap_stripe/__init__.py +++ b/tap_stripe/__init__.py @@ -284,6 +284,57 @@ def discover(): return {'streams': streams} +def value_at_breadcrumb(breadcrumb, rec): + if len(breadcrumb) == 1: + return rec.get(breadcrumb[0]) + if rec.get(breadcrumb[0]): + return value_at_breadcrumb(breadcrumb[1:], rec[breadcrumb[0]]) + return None + + +def insert_at_breadcrumb(breadcrumb, value, rec): + if len(breadcrumb) == 1: + rec[breadcrumb[0]] = value + else: + if rec.get(breadcrumb[0]): + insert_at_breadcrumb(breadcrumb[1:], value, rec[breadcrumb[0]]) + else: + rec[breadcrumb[0]] = {} + insert_at_breadcrumb(breadcrumb[1:], value, rec[breadcrumb[0]]) + + +def apply_whitelist(rec, stream_field_whitelist): + """The whitelist is a map from stream_name to a list of breadcrumbs that + indicates which nested fields should be persisted. There shouldn't be + any top-level fields in the whitelist since users can already remove + these fields via field selection. The shape of the whitelist is: + { + : [ + [], + [], + ... + [] + ] + } + For now, the top level field should always be 'data' until we hear of + a need to extend this to other deeply nested objects + """ + filtered_rec = {} + + # Keep all the top level fields + for k, v in rec.items(): #pylint: disable=invalid-name + if not isinstance(v, (dict, list)): + filtered_rec[k] = v + + for breadcrumb in stream_field_whitelist: + assert len(breadcrumb) > 1 + assert breadcrumb[0] == 'data' + value_to_add = value_at_breadcrumb(breadcrumb, rec) + if value_to_add: + insert_at_breadcrumb(breadcrumb, value_to_add, filtered_rec) + return filtered_rec + + def reduce_foreign_keys(rec, stream_name): if stream_name == 'customers': rec['subscriptions'] = [s['id'] for s in rec.get('subscriptions', [])] or None @@ -301,6 +352,7 @@ def reduce_foreign_keys(rec, stream_name): rec['lines'][k] = [li.to_dict_recursive() for li in val] return rec +# pylint: disable=too-many-locals def sync_stream(stream_name): """ Sync each stream, looking for newly created records. Updates are captured by events stream. @@ -308,6 +360,8 @@ def sync_stream(stream_name): LOGGER.info("Started syncing stream %s", stream_name) stream_metadata = metadata.to_map(Context.get_catalog_entry(stream_name)['metadata']) + stream_field_whitelist = json.loads(Context.config.get('whitelist_map', '{}')).get(stream_name) + extraction_time = singer.utils.now() replication_key = metadata.get(stream_metadata, (), 'valid-replication-keys')[0] # Invoice Items bookmarks on `date`, but queries on `created` @@ -361,6 +415,12 @@ def sync_stream(stream_name): Context.get_catalog_entry(stream_name)['schema'], stream_metadata) + # At this point, the record has been transformed and so + # any de-selected fields have been pruned. Now, prune off + # any fields that aren't present in the whitelist. + if stream_field_whitelist: + rec = apply_whitelist(rec, stream_field_whitelist) + singer.write_record(stream_name, rec, time_extracted=extraction_time)