Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): pick topics from config for sink connector #12535

Merged
merged 7 commits into from
Feb 13, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Iterable, List, Optional, Type
from typing import Dict, Iterable, List, Optional, Type

Check warning on line 2 in metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py#L2

Added line #L2 was not covered by tests

import jpype
import jpype.imports
Expand Down Expand Up @@ -121,7 +121,9 @@
connector_manifest.config, self.config.provided_configs
)
connector_manifest.url = connector_url
connector_manifest.topic_names = self._get_connector_topics(connector_name)
connector_manifest.topic_names = self._get_connector_topics(

Check warning on line 124 in metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py#L124

Added line #L124 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - ideally when calling this method, we'd use kwargs e.g. self._get_connector_topics(connector=..., type=...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, this should become a lint checker
Unfortunately, this is not supported by ruff yet astral-sh/ruff#3269

connector_name, connector_manifest.config, connector_manifest.type
)
connector_class_value = connector_manifest.config.get(CONNECTOR_CLASS) or ""

class_type: Type[BaseConnector] = BaseConnector
Expand Down Expand Up @@ -203,7 +205,15 @@

return response.json()

def _get_connector_topics(self, connector_name: str) -> List[str]:
def _get_connector_topics(

Check warning on line 208 in metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py#L208

Added line #L208 was not covered by tests
self, connector_name: str, config: Dict, connector_type: str
) -> List[str]:
if connector_type == SINK and config.get("topics"):

Check warning on line 211 in metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py#L211

Added line #L211 was not covered by tests
# Sink connectors may configure `topics` as List of topics to consume, separated by commas
# https://kafka.apache.org/documentation/#sinkconnectorconfigs_topics
# https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics
return [topic.strip() for topic in config["topics"].split(",")]

Check warning on line 215 in metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py#L215

Added line #L215 was not covered by tests

try:
response = self.session.get(
f"{self.config.connect_uri}/connectors/{connector_name}/topics",
Expand Down
Loading