Skip to content

Commit

Permalink
fix(kafka-connect): add platform for default case in jdbc connector, …
Browse files Browse the repository at this point in the history
…update tests for platform instance map
  • Loading branch information
mayurinehate committed Mar 31, 2022
1 parent a10a4ac commit 981a582
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 93 deletions.
52 changes: 25 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ def unquote(string: str, leading_quote: str = '"', trailing_quote: str = None) -
return string


def get_dataset_name(
database_name: Optional[str],
instance_name: Optional[str],
source_table: str,
) -> str:
if database_name and instance_name:
dataset_name = instance_name + "." + database_name + "." + source_table
elif database_name:
dataset_name = database_name + "." + source_table
else:
dataset_name = source_table

return dataset_name


def get_instance_name(
config: KafkaConnectSourceConfig, kafka_connector_name: str, source_platform: str
) -> Optional[str]:
Expand Down Expand Up @@ -261,6 +276,7 @@ def default_get_lineages(
source_platform: str,
topic_names: Optional[Iterable[str]] = None,
include_source_dataset: bool = True,
instance_name: Optional[str] = None,
) -> List[KafkaConnectLineage]:
lineages: List[KafkaConnectLineage] = []
if not topic_names:
Expand All @@ -285,8 +301,8 @@ def default_get_lineages(
self.connector_manifest.name,
f"could not find schema for table {source_table}",
)
dataset_name: str = (
f"{database_name}.{source_table}" if database_name else source_table
dataset_name: str = get_dataset_name(
database_name, instance_name, source_table
)
lineage = KafkaConnectLineage(
source_dataset=dataset_name if include_source_dataset else None,
Expand Down Expand Up @@ -343,21 +359,6 @@ def get_table_names(self) -> List[Tuple]:
]
return tables

def get_dataset_name(
self,
database_name: Optional[str],
instance_name: Optional[str],
source_table: str,
) -> str:
if database_name and instance_name:
dataset_name = instance_name + "." + database_name + "." + source_table
elif database_name:
dataset_name = database_name + "." + source_table
else:
dataset_name = source_table

return dataset_name

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
Expand Down Expand Up @@ -392,7 +393,7 @@ def _extract_lineages(self):
# Lineage source_table can be extracted by parsing query
for topic in self.connector_manifest.topic_names:
# default method - as per earlier implementation
dataset_name: str = self.get_dataset_name(
dataset_name: str = get_dataset_name(
database_name, instance_name, topic
)

Expand Down Expand Up @@ -432,6 +433,7 @@ def _extract_lineages(self):
database_name=database_name,
source_platform=source_platform,
topic_prefix=topic_prefix,
instance_name=instance_name,
)
return

Expand Down Expand Up @@ -461,7 +463,7 @@ def _extract_lineages(self):
if has_three_level_hierarchy(source_platform) and len(table) > 1:
source_table = f"{table[-2]}.{table[-1]}"

dataset_name = self.get_dataset_name(
dataset_name = get_dataset_name(
database_name, instance_name, source_table
)

Expand Down Expand Up @@ -509,6 +511,7 @@ def _extract_lineages(self):
source_platform=source_platform,
topic_prefix=topic_prefix,
include_source_dataset=include_source_dataset,
instance_name=instance_name,
)
self.connector_manifest.lineages = lineages
return
Expand Down Expand Up @@ -615,14 +618,9 @@ def _extract_lineages(self):
found = re.search(re.compile(topic_naming_pattern), topic)

if found:
if database_name and instance_name:
table_name = (
instance_name + "." + database_name + "." + found.group(2)
)
elif database_name:
table_name = database_name + "." + found.group(2)
else:
table_name = found.group(2)
table_name = get_dataset_name(
database_name, instance_name, found.group(2)
)

lineage = KafkaConnectLineage(
source_dataset=table_name,
Expand Down
Loading

0 comments on commit 981a582

Please sign in to comment.