Skip to content
This repository has been archived by the owner on Jan 27, 2025. It is now read-only.

Commit

Permalink
fix(ingest): kafka ingest task hand up with error bootstrap server (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
wangsaisai authored and cccs-Dustin committed Feb 1, 2023
1 parent 40da666 commit a9b45e0
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/configuration/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class _KafkaConnectionConfig(ConfigModel):
description="Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient",
)

client_timeout_seconds: int = Field(default=60, description="The request timeout used when interacting with the Kafka APIs.")

@validator("bootstrap")
def bootstrap_host_colon_port_comma(cls, val: str) -> str:
for entry in val.split(","):
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
topics = self.consumer.list_topics().topics
topics = self.consumer.list_topics(
timeout=self.source_config.connection.client_timeout_seconds
).topics
extra_topic_details = self.fetch_extra_topic_details(topics.keys())

for t, t_detail in topics.items():
Expand Down

0 comments on commit a9b45e0

Please sign in to comment.