diff --git a/tap_rest_api/helper.py b/tap_rest_api/helper.py index cc867ab..5d58cac 100644 --- a/tap_rest_api/helper.py +++ b/tap_rest_api/helper.py @@ -1,4 +1,4 @@ -import attr, backoff, dateutil, datetime, os, requests +import attr, backoff, dateutil, datetime, hashlib, os, requests import simplejson as json from urllib.parse import quote as urlquote from requests.auth import HTTPBasicAuth, HTTPDigestAuth @@ -179,6 +179,13 @@ def get_end(config): return end_from_config +def get_digest_from_record(record): + digest = hashlib.md5( + json.dumps(record, sort_keys=True).encode("utf-8") + ).hexdigest() + return digest + + def get_last_update(config, record, current): last_update = current if config.get("timestamp_key"): diff --git a/tap_rest_api/sync.py b/tap_rest_api/sync.py index 2c331e9..ce11a40 100644 --- a/tap_rest_api/sync.py +++ b/tap_rest_api/sync.py @@ -4,12 +4,23 @@ import singer import singer.metrics as metrics -from .helper import (generate_request, get_bookmark_type, get_end, get_endpoint, - get_init_endpoint_params, get_last_update, get_record, - get_record_list, get_selected_streams, get_start, - get_streams_to_sync, human_readable, - get_http_headers, - EXTRACT_TIMESTAMP) +from .helper import ( + generate_request, + get_bookmark_type, + get_end, + get_endpoint, + get_init_endpoint_params, + get_last_update, + get_record, + get_record_list, + get_selected_streams, + get_start, + get_streams_to_sync, + human_readable, + get_http_headers, + get_digest_from_record, + EXTRACT_TIMESTAMP, +) from .schema import filter_record, load_schema, validate @@ -112,8 +123,15 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None, # It's important to compare the record before adding # EXTRACT_TIMESTAMP - if record == prev_written_record: - LOGGER.debug("Skipping the duplicated row %s" % record) + digest = get_digest_from_record(record) + digest_dict = {"digest": digest} + # backward compatibility + if (prev_written_record == record or + prev_written_record == digest_dict): + LOGGER.info( + "Skipping the duplicated row with " + f"digest {digest}" + ) continue if EXTRACT_TIMESTAMP in schema["properties"].keys(): @@ -137,7 +155,8 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None, # EXTRACT_TIMESTAMP will be different. So popping it out # before storing. record.pop(EXTRACT_TIMESTAMP) - prev_written_record = record + digest = get_digest_from_record(record) + prev_written_record = {"digest": digest} # Exit conditions if len(rows) < config["items_per_page"]: