Skip to content

Commit

Permalink
feat: Support deselecting streams by default (#1672)
Browse files Browse the repository at this point in the history
* feat: Support deselecting streams by default

* Add tests
  • Loading branch information
edgarrmondragon authored May 9, 2023
1 parent 1ba86b3 commit c2bea08
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 5 deletions.
3 changes: 3 additions & 0 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def get_standard_metadata(
key_properties: list[str] | None = None,
valid_replication_keys: list[str] | None = None,
replication_method: str | None = None,
selected_by_default: bool | None = None,
) -> MetadataMapping:
"""Get default metadata for a stream.
Expand All @@ -175,6 +176,7 @@ def get_standard_metadata(
key_properties: Stream key properties.
valid_replication_keys: Stream valid replication keys.
replication_method: Stream replication method.
selected_by_default: Whether the stream is selected by default.
Returns:
Metadata mapping.
Expand All @@ -184,6 +186,7 @@ def get_standard_metadata(
table_key_properties=key_properties,
forced_replication_method=replication_method,
valid_replication_keys=valid_replication_keys,
selected_by_default=selected_by_default,
)

if schema:
Expand Down
13 changes: 9 additions & 4 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class Stream(metaclass=abc.ABCMeta):
# Internal API cost aggregator
_sync_costs: dict[str, int] = {}

selected_by_default: bool = True
"""Whether this stream is selected by default in the catalog."""

def __init__(
self,
tap: Tap,
Expand Down Expand Up @@ -428,7 +431,7 @@ def primary_keys(self) -> list[str] | None:
return self._primary_keys

@primary_keys.setter
def primary_keys(self, new_value: list[str]) -> None:
def primary_keys(self, new_value: list[str] | None) -> None:
"""Set primary key(s) for the stream.
Args:
Expand Down Expand Up @@ -472,7 +475,7 @@ def replication_key(self) -> str | None:
return self._replication_key

@replication_key.setter
def replication_key(self, new_value: str) -> None:
def replication_key(self, new_value: str | None) -> None:
"""Set replication key for the stream.
Args:
Expand Down Expand Up @@ -532,11 +535,13 @@ def metadata(self) -> singer.MetadataMapping:
[self.replication_key] if self.replication_key else None
),
schema_name=None,
selected_by_default=self.selected_by_default,
)

# If there's no input catalog, select all streams
if self._tap_input_catalog is None:
self._metadata.root.selected = True
self._metadata.root.selected = (
self._tap_input_catalog is None and self.selected_by_default
)

return self._metadata

Expand Down
3 changes: 2 additions & 1 deletion tests/_singerlib/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def test_catalog_parsing():
)
def test_standard_metadata(
schema: dict,
key_properties: list[str] | None,
key_properties: list[str],
replication_method: str | None,
valid_replication_keys: list[str] | None,
schema_name: str | None,
Expand All @@ -256,6 +256,7 @@ def test_standard_metadata(
)

stream_metadata = metadata[()]
assert isinstance(stream_metadata, StreamMetadata)
assert stream_metadata.table_key_properties == key_properties
assert stream_metadata.forced_replication_method == replication_method
assert stream_metadata.valid_replication_keys == valid_replication_keys
Expand Down
78 changes: 78 additions & 0 deletions tests/core/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,81 @@ def calculate_test_cost(
for record in caplog.records:
assert record.levelname == "INFO"
assert f"Total Sync costs for stream {stream.name}" in record.message


@pytest.mark.parametrize(
"input_catalog,selection",
[
pytest.param(
None,
{
"selected_stream": True,
"unselected_stream": False,
},
id="no_catalog",
),
pytest.param(
{
"streams": [],
},
{
"selected_stream": False,
"unselected_stream": False,
},
id="empty_catalog",
),
pytest.param(
{
"streams": [
{
"tap_stream_id": "selected_stream",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"selected": True,
},
},
],
},
{
"tap_stream_id": "unselected_stream",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"selected": True,
},
},
],
},
],
},
{
"selected_stream": True,
"unselected_stream": True,
},
id="catalog_with_selection",
),
],
)
def test_stream_class_selection(input_catalog, selection):
"""Test stream class selection."""

class SelectedStream(RESTStream):
name = "selected_stream"
url_base = "https://example.com"
schema = {"type": "object", "properties": {}}

class UnselectedStream(SelectedStream):
name = "unselected_stream"
selected_by_default = False

class MyTap(SimpleTestTap):
def discover_streams(self):
return [SelectedStream(self), UnselectedStream(self)]

# Check that the selected stream is selected
tap = MyTap(config=None, catalog=input_catalog)
for stream in selection:
assert tap.streams[stream].selected is selection[stream]

0 comments on commit c2bea08

Please sign in to comment.