Skip to content

Commit

Permalink
Merge branch 'main' into nox_test_cookiecutter
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Mar 31, 2023
2 parents 2f515c1 + a5a94d7 commit de6d6ee
Show file tree
Hide file tree
Showing 36 changed files with 299 additions and 132 deletions.
46 changes: 23 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ select = [
"A", # flake8-builtins
"COM", # flake8-commas
"C4", # flake8-comprehensions
"DTZ", # flake8-datetimezs
"T10", # flake8-debugger
"ISC", # flake8-implicit-str-concat
"ICN", # flake8-import-conventions
Expand Down
56 changes: 54 additions & 2 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import abc
import typing as t

if t.TYPE_CHECKING:
Expand All @@ -24,8 +25,59 @@ class MapExpressionError(Exception):
"""Failed map expression evaluation."""


class MaxRecordsLimitException(Exception):
"""Exception to raise if the maximum number of allowable records is exceeded."""
class RequestedAbortException(Exception):
"""Base class for abort and interrupt requests.
Whenever this exception is raised, streams will attempt to shut down gracefully and
will emit a final resumable `STATE` message if it is possible to do so.
"""


class MaxRecordsLimitException(RequestedAbortException):
"""Exception indicating the sync aborted due to too many records."""


class AbortedSyncExceptionBase(Exception, metaclass=abc.ABCMeta):
"""Base exception to raise when a stream sync is aborted.
Developers should not raise this directly, and instead should use:
1. `FatalAbortedSyncException` - Indicates the stream aborted abnormally and was not
able to reach a stable and resumable state.
2. `PausedSyncException` - Indicates the stream aborted abnormally and successfully
reached a 'paused' and resumable state.
Notes:
- `FULL_TABLE` sync operations cannot be paused and will always trigger a fatal
exception if aborted.
- `INCREMENTAL` and `LOG_BASED` streams are able to be paused only if a number of
preconditions are met, specifically, `state_partitioning_keys` cannot be
overridden and the stream must be declared with `is_sorted=True`.
"""


class AbortedSyncFailedException(AbortedSyncExceptionBase):
"""Exception to raise when sync is aborted and unable to reach a stable state.
This signifies that `FULL_TABLE` streams (if applicable) were successfully
completed, and any bookmarks from `INCREMENTAL` and `LOG_BASED` streams were
advanced and finalized successfully.
"""


class AbortedSyncPausedException(AbortedSyncExceptionBase):
"""Exception to raise when an aborted sync operation is paused successfully.
This exception indicates the stream aborted abnormally and successfully
reached a 'paused' status, and emitted a resumable state artifact before exiting.
Streams synced with `FULL_TABLE` replication can never have partial success or
'paused' status.
If this exception is raised, this signifies that additional records were left
on the source system and the sync operation aborted before reaching the end of the
stream. This exception signifies that bookmarks from `INCREMENTAL`
and `LOG_BASED` streams were successfully emitted and are resumable.
"""


class RecordsWithoutSchemaException(Exception):
Expand Down
16 changes: 14 additions & 2 deletions singer_sdk/helpers/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,24 @@ def _greater_than_signpost(
return new_value > signpost


def is_state_non_resumable(stream_or_partition_state: dict) -> bool:
"""Return True when state is non-resumable.
This is determined by checking for a "progress marker" tag in the state artifact.
"""
return PROGRESS_MARKERS in stream_or_partition_state


def finalize_state_progress_markers(stream_or_partition_state: dict) -> dict | None:
"""Promote or wipe progress markers once sync is complete."""
"""Promote or wipe progress markers once sync is complete.
This marks any non-resumable progress markers as finalized. If there are
valid bookmarks present, they will be promoted to be resumable.
"""
signpost_value = stream_or_partition_state.pop(SIGNPOST_MARKER, None)
stream_or_partition_state.pop(STARTING_MARKER, None)
if (
PROGRESS_MARKERS in stream_or_partition_state
is_state_non_resumable(stream_or_partition_state)
and "replication_key" in stream_or_partition_state[PROGRESS_MARKERS]
):
# Replication keys valid (only) after sync is complete
Expand Down
7 changes: 5 additions & 2 deletions singer_sdk/helpers/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
_MAX_TIME = "23:59:59.999999"
JSONSCHEMA_ANNOTATION_SECRET = "secret" # noqa: S105
JSONSCHEMA_ANNOTATION_WRITEONLY = "writeOnly"
UTC = datetime.timezone.utc


class DatetimeErrorTreatmentEnum(Enum):
Expand Down Expand Up @@ -452,9 +453,11 @@ def _conform_primitive_property(elem: Any, property_schema: dict) -> Any:
if isinstance(elem, datetime.date):
return elem.isoformat() + "T00:00:00+00:00"
if isinstance(elem, datetime.timedelta):
epoch = datetime.datetime.utcfromtimestamp(0)
epoch = datetime.datetime.fromtimestamp(0, UTC)
timedelta_from_epoch = epoch + elem
return timedelta_from_epoch.isoformat() + "+00:00"
if timedelta_from_epoch.tzinfo is None:
timedelta_from_epoch = timedelta_from_epoch.replace(tzinfo=UTC)
return timedelta_from_epoch.isoformat()
if isinstance(elem, datetime.time):
return str(elem)
if isinstance(elem, bytes):
Expand Down
2 changes: 1 addition & 1 deletion singer_sdk/sinks/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _get_context(self, record: dict) -> dict: # noqa: ARG002
if self._pending_batch is None:
new_context = {
"batch_id": str(uuid.uuid4()),
"batch_start_time": datetime.datetime.now(),
"batch_start_time": datetime.datetime.now(tz=datetime.timezone.utc),
}
self.start_batch(new_context)
self._pending_batch = new_context
Expand Down
7 changes: 5 additions & 2 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,12 @@ def _add_sdc_metadata_to_record(
context: Stream partition or context dictionary.
"""
record["_sdc_extracted_at"] = message.get("time_extracted")
record["_sdc_received_at"] = datetime.datetime.now().isoformat()
record["_sdc_received_at"] = datetime.datetime.now(
tz=datetime.timezone.utc,
).isoformat()
record["_sdc_batched_at"] = (
context.get("batch_start_time", None) or datetime.datetime.now()
context.get("batch_start_time", None)
or datetime.datetime.now(tz=datetime.timezone.utc)
).isoformat()
record["_sdc_deleted_at"] = record.get("_sdc_deleted_at")
record["_sdc_sequence"] = int(round(time.time() * 1000))
Expand Down
Loading

0 comments on commit de6d6ee

Please sign in to comment.