Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resolve failure in _increment_stream_state() for cases when replication_method is LOG_BASED #1126

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,11 @@ def _increment_stream_state(
) -> None:
"""Update state of stream or partition with data from the provided record.

Raises InvalidStreamSortException is self.is_sorted = True and unsorted data is
detected.
Raises `InvalidStreamSortException` is `self.is_sorted = True` and unsorted data
is detected.

Note: The default implementation does not advance any bookmarks unless
`self.replication_method == 'INCREMENTAL'.

Args:
latest_record: TODO
Expand All @@ -705,28 +708,27 @@ def _increment_stream_state(
Raises:
ValueError: TODO
"""
# This also creates a state entry if one does not yet exist:
state_dict = self.get_context_state(context)
if latest_record:
if self.replication_method in [
REPLICATION_INCREMENTAL,
REPLICATION_LOG_BASED,
]:
if not self.replication_key:
raise ValueError(
f"Could not detect replication key for '{self.name}' stream"
f"(replication method={self.replication_method})"
)
treat_as_sorted = self.is_sorted
if not treat_as_sorted and self.state_partitioning_keys is not None:
# Streams with custom state partitioning are not resumable.
treat_as_sorted = False
increment_state(
state_dict,
replication_key=self.replication_key,
latest_record=latest_record,
is_sorted=treat_as_sorted,
check_sorted=self.check_sorted,

# Advance state bookmark values if applicable
if latest_record and self.replication_method == REPLICATION_INCREMENTAL:
if not self.replication_key:
raise ValueError(
f"Could not detect replication key for '{self.name}' stream"
f"(replication method={self.replication_method})"
)
treat_as_sorted = self.is_sorted
if not treat_as_sorted and self.state_partitioning_keys is not None:
# Streams with custom state partitioning are not resumable.
treat_as_sorted = False
increment_state(
state_dict,
replication_key=self.replication_key,
latest_record=latest_record,
is_sorted=treat_as_sorted,
check_sorted=self.check_sorted,
)

# Private message authoring methods:

Expand Down