Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ingestion/fivetran): Connector performance optimization #10556

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.api.report import Report
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
)
Expand All @@ -20,6 +21,7 @@
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -110,10 +112,26 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
return values


@dataclass
class MetadataExtractionPerfReport(Report):
connectors_metadata_extraction_sec: PerfTimer = dataclass_field(
default_factory=PerfTimer
)
connectors_lineage_extraction_sec: PerfTimer = dataclass_field(
default_factory=PerfTimer
)
connectors_jobs_extraction_sec: PerfTimer = dataclass_field(
default_factory=PerfTimer
)


@dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)
metadata_extraction_perf: MetadataExtractionPerfReport = dataclass_field(
default_factory=MetadataExtractionPerfReport
)

def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count
Expand Down Expand Up @@ -163,3 +181,7 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
default={},
description="A mapping of destination dataset to platform instance. Use destination id as key.",
)
extract_sync_history_for_interval: int = pydantic.Field(
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
7,
description="Interval in days to extract connector's sync history from current date.",
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Optional
from typing import List


@dataclass
Expand All @@ -23,7 +23,7 @@ class Connector:
paused: bool
sync_frequency: int
destination_id: str
user_email: Optional[str]
user_id: str
table_lineage: List[TableLineage]
jobs: List["Job"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class FivetranSource(StatefulIngestionSourceBase):
platform: str = "fivetran"

def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
super(FivetranSource, self).__init__(config, ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes happen automatically. Let me revert it

self.config = config
self.report = FivetranSourceReport()

Expand Down Expand Up @@ -173,11 +173,12 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
env=self.config.env,
platform_instance=self.config.platform_instance,
)
owner_email = self.audit_log.get_user_email(connector.user_id)
datajob = DataJob(
id=connector.connector_id,
flow_urn=dataflow_urn,
name=connector.connector_name,
owners={connector.user_email} if connector.user_email else set(),
owners={owner_email} if owner_email else set(),
)

job_property_bag: Dict[str, str] = {}
Expand Down Expand Up @@ -281,7 +282,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
logger.info("Fivetran plugin execution is started")
connectors = self.audit_log.get_allowed_connectors_list(
self.config.connector_patterns, self.report
self.config.connector_patterns,
self.report,
self.config.extract_sync_history_for_interval,
)
for connector in connectors:
logger.info(f"Processing connector id: {connector.connector_id}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -76,7 +77,7 @@ def _initialize_fivetran_variables(
)

def _query(self, query: str) -> List[Dict]:
logger.debug(f"Query : {query}")
logger.debug("Query : {}".format(query))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes happen automatically. Let me revert it

resp = self.engine.execute(query)
return [row for row in resp]

Expand Down Expand Up @@ -151,9 +152,13 @@ def _get_table_lineage(

return table_lineage_list

def _get_all_connector_sync_logs(self) -> Dict[str, Dict]:
def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]:
sync_logs = {}
for row in self._query(self.fivetran_log_query.get_sync_logs_query()):
for row in self._query(
self.fivetran_log_query.get_sync_logs_query().format(
self.fivetran_log_query.db_clause, syncs_interval
)
):
if row[Constant.CONNECTOR_ID] not in sync_logs:
sync_logs[row[Constant.CONNECTOR_ID]] = {
row[Constant.SYNC_ID]: {
Expand Down Expand Up @@ -208,50 +213,61 @@ def _get_jobs_list(
)
return jobs

def _get_user_email(self, user_id: Optional[str]) -> Optional[str]:
@functools.lru_cache()
def get_user_email(self, user_id: str) -> Optional[str]:
if not user_id:
return None
user_details = self._query(
self.fivetran_log_query.get_user_query(user_id=user_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this sending one query per user? ideally this would also do a bulk fetch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)

if not user_details:
return None

return f"{user_details[0][Constant.EMAIL]}"

def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None:
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
for connector in connectors:
connector.table_lineage = self._get_table_lineage(
column_lineage_metadata=column_lineage_metadata,
table_lineage_result=table_lineage_metadata.get(connector.connector_id),
)

def _fill_connectors_jobs(
self, connectors: List[Connector], syncs_interval: int
) -> None:
sync_logs = self._get_all_connector_sync_logs(syncs_interval)
for connector in connectors:
connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id))

def get_allowed_connectors_list(
self, connector_patterns: AllowDenyPattern, report: FivetranSourceReport
self,
connector_patterns: AllowDenyPattern,
report: FivetranSourceReport,
syncs_interval: int,
) -> List[Connector]:
connectors: List[Connector] = []
sync_logs = self._get_all_connector_sync_logs()
table_lineage_metadata = self._get_connectors_table_lineage_metadata()
column_lineage_metadata = self._get_column_lineage_metadata()
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_email=self._get_user_email(
connector[Constant.CONNECTING_USER_ID]
),
table_lineage=self._get_table_lineage(
column_lineage_metadata=column_lineage_metadata,
table_lineage_result=table_lineage_metadata.get(
connector[Constant.CONNECTOR_ID]
),
),
jobs=self._get_jobs_list(
sync_logs.get(connector[Constant.CONNECTOR_ID])
),
with report.metadata_extraction_perf.connectors_metadata_extraction_sec:
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_id=connector[Constant.CONNECTING_USER_ID],
table_lineage=[],
jobs=[],
)
)
)
with report.metadata_extraction_perf.connectors_lineage_extraction_sec:
self._fill_connectors_table_lineage(connectors)
with report.metadata_extraction_perf.connectors_jobs_extraction_sec:
self._fill_connectors_jobs(connectors, syncs_interval)
return connectors
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ def get_user_query(self, user_id: str) -> str:
WHERE id = '{user_id}'"""

def get_sync_logs_query(self) -> str:
return f"""
return """
SELECT connector_id,
sync_id,
message_event,
message_data,
time_stamp
FROM {self.db_clause}log
WHERE message_event in ('sync_start', 'sync_end')"""
FROM {}log
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use named format args

e.g. "{foo}".format(foo='whatever')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

WHERE message_event in ('sync_start', 'sync_end')
and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{} days'"""

def get_table_lineage_query(self) -> str:
return f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def default_query_results(
"email": "[email protected]",
}
]
elif query == fivetran_log_query.get_sync_logs_query():
elif query == fivetran_log_query.get_sync_logs_query().format(
fivetran_log_query.db_clause, 7
):
return [
{
"connector_id": "calendar_elected",
Expand Down
Loading