Skip to content

Commit

Permalink
Use digest for dup check instead of raw record #24
Browse files Browse the repository at this point in the history
  • Loading branch information
daigotanaka committed Aug 2, 2021
1 parent a5fee6f commit 4c55584
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
9 changes: 8 additions & 1 deletion tap_rest_api/helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import attr, backoff, dateutil, datetime, os, requests
import attr, backoff, dateutil, datetime, hashlib, os, requests
import simplejson as json
from urllib.parse import quote as urlquote
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
Expand Down Expand Up @@ -179,6 +179,13 @@ def get_end(config):
return end_from_config


def get_digest_from_record(record):
digest = hashlib.md5(
json.dumps(record, sort_keys=True).encode("utf-8")
).hexdigest()
return digest


def get_last_update(config, record, current):
last_update = current
if config.get("timestamp_key"):
Expand Down
37 changes: 28 additions & 9 deletions tap_rest_api/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
import singer
import singer.metrics as metrics

from .helper import (generate_request, get_bookmark_type, get_end, get_endpoint,
get_init_endpoint_params, get_last_update, get_record,
get_record_list, get_selected_streams, get_start,
get_streams_to_sync, human_readable,
get_http_headers,
EXTRACT_TIMESTAMP)
from .helper import (
generate_request,
get_bookmark_type,
get_end,
get_endpoint,
get_init_endpoint_params,
get_last_update,
get_record,
get_record_list,
get_selected_streams,
get_start,
get_streams_to_sync,
human_readable,
get_http_headers,
get_digest_from_record,
EXTRACT_TIMESTAMP,
)
from .schema import filter_record, load_schema, validate


Expand Down Expand Up @@ -112,8 +123,15 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None,

# It's important to compare the record before adding
# EXTRACT_TIMESTAMP
if record == prev_written_record:
LOGGER.debug("Skipping the duplicated row %s" % record)
digest = get_digest_from_record(record)
digest_dict = {"digest": digest}
# backward compatibility
if (prev_written_record == record or
prev_written_record == digest_dict):
LOGGER.info(
"Skipping the duplicated row with "
f"digest {digest}"
)
continue

if EXTRACT_TIMESTAMP in schema["properties"].keys():
Expand All @@ -137,7 +155,8 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None,
# EXTRACT_TIMESTAMP will be different. So popping it out
# before storing.
record.pop(EXTRACT_TIMESTAMP)
prev_written_record = record
digest = get_digest_from_record(record)
prev_written_record = {"digest": digest}

# Exit conditions
if len(rows) < config["items_per_page"]:
Expand Down

0 comments on commit 4c55584

Please sign in to comment.