-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingest): pick topics from config for sink connector #12535
fix(ingest): pick topics from config for sink connector #12535
Conversation
Codecov ReportAttention: Patch coverage is
... and 35 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some tests for this?
metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py
Outdated
Show resolved
Hide resolved
Updated test here Without changes, other than config - stale topic shows up and after changes, it does not. |
@@ -121,7 +121,9 @@ def get_connectors_manifest(self) -> Iterable[ConnectorManifest]: | |||
connector_manifest.config, self.config.provided_configs | |||
) | |||
connector_manifest.url = connector_url | |||
connector_manifest.topic_names = self._get_connector_topics(connector_name) | |||
connector_manifest.topic_names = self._get_connector_topics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - ideally when calling this method, we'd use kwargs e.g. self._get_connector_topics(connector=..., type=...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, this should become a lint checker
Unfortunately, this is not supported by ruff
yet astral-sh/ruff#3269
class SinkTopicFilter: | ||
"""Helper class to filter Kafka Connect topics based on configuration.""" | ||
|
||
def filter_stale_topics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment explaining why we need this
something like kafka-connect's API returns an over-populated list of topics. if a topic was ever used by a connector, it will be returned, even if it is no longer used. to remove these stale topics from the list, we double-check the list returned by the API against the sink connector's config
@@ -175,7 +175,7 @@ class BigQuerySinkConnector(BaseConnector): | |||
class BQParser: | |||
project: str | |||
target_platform: str | |||
sanitizeTopics: str | |||
sanitizeTopics: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was the type annotation here previously wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
metadata-ingestion/src/datahub/ingestion/source/kafka_connect/sink_connectors.py
Show resolved
Hide resolved
@@ -682,3 +687,43 @@ def test_kafka_connect_bigquery_sink_ingest( | |||
golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json", | |||
ignore_paths=[], | |||
) | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while it may be obvious, the case sink_config
missing both topics
and topics.regex
keys may be added for completeness
e2a98d2
into
datahub-project:master
Checklist