Skip to content

Commit

Permalink
Use digest for dup check instead of raw record #24 (#25)
Browse files Browse the repository at this point in the history
* Use digest for dup check instead of raw record #24

* make timestamp in bookmark integer

* handle integerized timestamp from bookmark

* handle None bookmark case for timestamp
  • Loading branch information
daigotanaka authored Mar 11, 2022
1 parent 4228cc9 commit 78bad7d
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 15 deletions.
2 changes: 2 additions & 0 deletions tap_rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ def main():
streams = CONFIG["streams"].split(",")
elif CONFIG.get("schema"):
streams = [CONFIG["schema"]]
else:
raise Exception("Config needs to specify streams or schema variable.")

for stream in streams:
stream = stream.strip()
Expand Down
26 changes: 21 additions & 5 deletions tap_rest_api/helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import attr, backoff, dateutil, datetime, os, requests
import attr, backoff, dateutil, datetime, hashlib, os, requests
import simplejson as json
from urllib.parse import quote as urlquote
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
Expand Down Expand Up @@ -143,6 +143,8 @@ def get_start(config, state, tap_stream_id, bookmark_key):
if current_bookmark is None:
current_bookmark = dateutil.parser.parse(
config["start_datetime"]).timestamp()
else:
current_bookmark = get_float_timestamp(current_bookmark)
elif config.get("datetime_key"):
if not config.get("start_datetime"):
raise KeyError(
Expand Down Expand Up @@ -179,14 +181,28 @@ def get_end(config):
return end_from_config


def get_digest_from_record(record):
digest = hashlib.md5(
json.dumps(record, sort_keys=True).encode("utf-8")
).hexdigest()
return digest


def get_float_timestamp(ts):
# Handle the data with sub-seconds converted to int
ex_digits = len(str(int(ts))) - 10
value = float(ts) / (pow(10, ex_digits))
return value


def get_last_update(config, record, current):
last_update = current
if config.get("timestamp_key"):
value = _get_jsonpath(record, config["timestamp_key"])[0]
if value and value > current:
# Handle the data with sub-seconds converted to int
ex_digits = len(str(int(value))) - 10
last_update = float(value) / (pow(10, ex_digits))
if value:
value = get_float_timestamp(value)
if value > current:
last_update = value
else:
KeyError("timestamp_key not found in the record")
elif config.get("datetime_key"):
Expand Down
44 changes: 34 additions & 10 deletions tap_rest_api/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@
import singer
import singer.metrics as metrics

from .helper import (generate_request, get_bookmark_type, get_end, get_endpoint,
get_init_endpoint_params, get_last_update, get_record,
get_record_list, get_selected_streams, get_start,
get_streams_to_sync, human_readable,
get_http_headers,
EXTRACT_TIMESTAMP)
from .helper import (
generate_request,
get_bookmark_type,
get_end,
get_endpoint,
get_init_endpoint_params,
get_last_update,
get_float_timestamp,
get_record,
get_record_list,
get_selected_streams,
get_start,
get_streams_to_sync,
human_readable,
get_http_headers,
get_digest_from_record,
EXTRACT_TIMESTAMP,
)
from .schema import filter_record, load_schema, validate


Expand Down Expand Up @@ -113,8 +125,15 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None,

# It's important to compare the record before adding
# EXTRACT_TIMESTAMP
if record == prev_written_record:
LOGGER.debug("Skipping the duplicated row %s" % record)
digest = get_digest_from_record(record)
digest_dict = {"digest": digest}
# backward compatibility
if (prev_written_record == record or
prev_written_record == digest_dict):
LOGGER.info(
"Skipping the duplicated row with "
f"digest {digest}"
)
continue

if EXTRACT_TIMESTAMP in schema["properties"].keys():
Expand All @@ -138,7 +157,8 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None,
# EXTRACT_TIMESTAMP will be different. So popping it out
# before storing.
record.pop(EXTRACT_TIMESTAMP)
prev_written_record = record
digest = get_digest_from_record(record)
prev_written_record = {"digest": digest}

# Exit conditions
if len(rows) < config["items_per_page"]:
Expand All @@ -157,6 +177,10 @@ def sync_rows(config, state, tap_stream_id, key_properties=[], auth_method=None,
page_number +=1
offset_number += len(rows)

# If timestamp_key is not integerized, do so at millisecond level
if config.get("timestamp_key") and len(str(int(last_update))) == 10:
last_update = int(last_update * 1000)

state = singer.write_bookmark(state, tap_stream_id, "last_update",
last_update)
if prev_written_record:
Expand Down Expand Up @@ -215,7 +239,7 @@ def sync(config, streams, state, catalog, assume_sorted=True, max_page=None,
last_update = state["bookmarks"][stream.tap_stream_id]["last_update"]
if bookmark_type == "timestamp":
last_update = str(last_update) + " (" + str(
datetime.datetime.fromtimestamp(last_update)) + ")"
datetime.datetime.fromtimestamp(get_float_timestamp(last_update))) + ")"
LOGGER.info("%s End sync" % stream.tap_stream_id)
LOGGER.info("%s Last record's %s: %s" %
(stream.tap_stream_id, bookmark_type, last_update))
Expand Down

0 comments on commit 78bad7d

Please sign in to comment.