Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(taps): Use recent start_date as starting_replication_value #759

Merged
merged 19 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,29 @@ def _write_replication_key_signpost(
state = self.get_context_state(context)
write_replication_key_signpost(state, value)

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a bookmark value to a start date and return the most recent value.

If the replication key is a datetime-formatted string, this method will parse
the value and compare it to the start date. Otherwise, the bookmark value is
returned.

If the tap uses a non-datetime replication key (e.g. an UNIX timestamp), the
developer is encouraged to override this method to provide custom logic for
comparing the bookmark value to the start date.

Args:
value: The replication key value.
start_date_value: The start date value from the config.

Returns:
The most recent value between the bookmark and start date.
"""
if self.is_timestamp_replication_key:
return max(value, start_date_value, key=pendulum.parse)
else:
return value

def _write_starting_replication_value(self, context: Optional[dict]) -> None:
"""Write the starting replication value, if available.

Expand All @@ -320,8 +343,13 @@ def _write_starting_replication_value(self, context: Optional[dict]) -> None:
):
value = replication_key_value

elif "start_date" in self.config:
value = self.config["start_date"]
# Use start_date if it is more recent than the replication_key state
start_date_value: Optional[str] = self.config.get("start_date")
if start_date_value:
if not value:
value = start_date_value
else:
value = self.compare_start_date(value, start_date_value)

write_starting_replication_value(state, value)

Expand Down
159 changes: 127 additions & 32 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping
from singer_sdk.helpers.jsonpath import _compile_jsonpath
from singer_sdk.streams.core import (
REPLICATION_FULL_TABLE,
Expand All @@ -25,6 +26,8 @@
StringType,
)

CONFIG_START_DATE = "2021-01-01"


class SimpleTestStream(Stream):
"""Test stream class."""
Expand All @@ -48,6 +51,26 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
yield {"id": 3, "value": "India"}


class UnixTimestampIncrementalStream(SimpleTestStream):
name = "unix_ts"
schema = PropertiesList(
Property("id", IntegerType, required=True),
Property("value", StringType, required=True),
Property("updatedAt", IntegerType, required=True),
).to_dict()
replication_key = "updatedAt"
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved


class UnixTimestampIncrementalStream2(UnixTimestampIncrementalStream):
name = "unix_ts_override"

def compare_start_date(self, value: str, start_date_value: str) -> str:
"""Compare a value to a start date value."""

start_timestamp = pendulum.parse(start_date_value).format("X")
return max(value, start_timestamp, key=float)


class RestTestStream(RESTStream):
"""Test RESTful stream class."""

Expand Down Expand Up @@ -82,28 +105,19 @@ class SimpleTestTap(Tap):

def discover_streams(self) -> List[Stream]:
"""List all streams."""
return [SimpleTestStream(self)]
return [
SimpleTestStream(self),
UnixTimestampIncrementalStream(self),
UnixTimestampIncrementalStream2(self),
]


@pytest.fixture
def tap() -> SimpleTestTap:
"""Tap instance."""
catalog_dict = {
"streams": [
{
"key_properties": ["id"],
"tap_stream_id": SimpleTestStream.name,
"stream": SimpleTestStream.name,
"schema": SimpleTestStream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
return SimpleTestTap(
config={"start_date": "2021-01-01"},
config={"start_date": CONFIG_START_DATE},
parse_env_config=False,
catalog=catalog_dict,
)


Expand All @@ -113,47 +127,128 @@ def stream(tap: SimpleTestTap) -> SimpleTestStream:
return cast(SimpleTestStream, tap.load_streams()[0])


@pytest.fixture
def unix_timestamp_stream(tap: SimpleTestTap) -> UnixTimestampIncrementalStream:
"""Create a new stream instance."""
return cast(UnixTimestampIncrementalStream, tap.load_streams()[1])


def test_stream_apply_catalog(tap: SimpleTestTap, stream: SimpleTestStream):
"""Applying a catalog to a stream should overwrite fields."""
assert stream.primary_keys == []
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.forced_replication_method is None

assert tap.input_catalog is not None
stream.apply_catalog(catalog=tap.input_catalog)
stream.apply_catalog(
catalog=Catalog.from_dict(
{
"streams": [
{
"tap_stream_id": stream.name,
"metadata": MetadataMapping(),
"key_properties": ["id"],
"stream": stream.name,
"schema": stream.schema,
"replication_method": REPLICATION_FULL_TABLE,
"replication_key": None,
}
]
}
)
)

assert stream.primary_keys == ["id"]
assert stream.replication_key is None
assert stream.replication_method == REPLICATION_FULL_TABLE
assert stream.forced_replication_method == REPLICATION_FULL_TABLE


def test_stream_starting_timestamp(tap: SimpleTestTap, stream: SimpleTestStream):
"""Validate state and start_time setting handling."""
timestamp_value = "2021-02-01"
@pytest.mark.parametrize(
"stream_name,bookmark_value,expected_starting_value",
[
pytest.param(
"test",
None,
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-no-state",
),
pytest.param(
"test",
"2021-02-01",
pendulum.datetime(2021, 2, 1),
id="datetime-repl-key-recent-bookmark",
),
pytest.param(
"test",
"2020-01-01",
pendulum.parse(CONFIG_START_DATE),
id="datetime-repl-key-old-bookmark",
),
pytest.param(
"unix_ts",
None,
CONFIG_START_DATE,
id="naive-unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts",
"1612137600",
"1612137600",
id="naive-unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts",
"1577858400",
"1577858400",
id="naive-unix-ts-repl-key-old-bookmark",
),
pytest.param(
"unix_ts_override",
None,
CONFIG_START_DATE,
id="unix-ts-repl-key-no-state",
),
pytest.param(
"unix_ts_override",
"1612137600",
"1612137600",
id="unix-ts-repl-key-recent-bookmark",
),
pytest.param(
"unix_ts_override",
"1577858400",
pendulum.parse(CONFIG_START_DATE).format("X"),
id="unix-ts-repl-key-old-bookmark",
),
],
)
def test_stream_starting_timestamp(
tap: SimpleTestTap,
stream_name: str,
bookmark_value: str,
expected_starting_value: Any,
):
"""Test the starting timestamp for a stream."""
stream = tap.streams[stream_name]

if stream.is_timestamp_replication_key:
get_starting_value = stream.get_starting_timestamp
else:
get_starting_value = stream.get_starting_replication_key_value

stream._write_starting_replication_value(None)
assert stream.get_starting_timestamp(None) == pendulum.parse(
cast(str, stream.config.get("start_date"))
)
tap.load_state(
{
"bookmarks": {
stream.name: {
stream_name: {
"replication_key": stream.replication_key,
"replication_key_value": timestamp_value,
"replication_key_value": bookmark_value,
}
}
}
)
stream._write_starting_replication_value(None)
assert stream.replication_key == "updatedAt"
assert stream.replication_method == REPLICATION_INCREMENTAL
assert stream.is_timestamp_replication_key
assert stream.get_starting_timestamp(None) == pendulum.parse(
timestamp_value
), f"Incorrect starting timestamp. Tap state was {dict(tap.state)}"
assert get_starting_value(None) == expected_starting_value


@pytest.mark.parametrize(
Expand Down