Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tableau): set ingestion stage report and perftimers #12234

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1203,9 +1203,9 @@ def get_tables_for_dataset(
report=self.report,
)

self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round(
timer.elapsed_seconds(), 2
)
self.report.metadata_extraction_sec[
f"{project_id}.{dataset.name}"
] = timer.elapsed_seconds(digits=2)

def get_core_table_details(
self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ def generate_lineage(
self.report.lineage_metadata_entries[project_id] = len(lineage)
logger.info(f"Built lineage map containing {len(lineage)} entries.")
logger.debug(f"lineage metadata is {lineage}")
self.report.lineage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)
self.report.lineage_mem_size[project_id] = humanfriendly.format_size(
memory_footprint.total_size(lineage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
)
self.report_status(f"usage-extraction-{project_id}", False)

self.report.usage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)

def _store_usage_event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,8 @@
else:
logger.info("View processing disabled, skipping")

self.report.metadata_extraction_sec[report_key] = round(
timer.elapsed_seconds(), 2
self.report.metadata_extraction_sec[report_key] = timer.elapsed_seconds(

Check warning on line 636 in metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py#L636

Added line #L636 was not covered by tests
digits=2
)

def _process_table(
Expand Down Expand Up @@ -986,9 +986,7 @@

yield from usage_extractor.get_usage_workunits(all_tables=all_tables)

self.report.usage_extraction_sec[database] = round(
timer.elapsed_seconds(), 2
)
self.report.usage_extraction_sec[database] = timer.elapsed_seconds(digits=2)

Check warning on line 989 in metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py#L989

Added line #L989 was not covered by tests

def extract_lineage(
self,
Expand All @@ -1011,8 +1009,8 @@
database=database, connection=connection, all_tables=all_tables
)

self.report.lineage_extraction_sec[f"{database}"] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds(

Check warning on line 1012 in metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py#L1012

Added line #L1012 was not covered by tests
digits=2
)
yield from self.generate_lineage(
database, lineage_extractor=lineage_extractor
Expand Down Expand Up @@ -1042,8 +1040,8 @@

yield from lineage_extractor.generate()

self.report.lineage_extraction_sec[f"{database}"] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds(

Check warning on line 1043 in metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py#L1043

Added line #L1043 was not covered by tests
digits=2
)

if self.redundant_lineage_run_skip_handler:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def _get_workunits_internal(
)
self.report.operational_metadata_extraction_sec[
self.config.database
] = round(timer.elapsed_seconds(), 2)
] = timer.elapsed_seconds(digits=2)

# Generate aggregate events
self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]:
)
self.report_status(USAGE_EXTRACTION_OPERATIONAL_STATS, False)
return
self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2)
self.report.access_history_query_secs = timer.elapsed_seconds(digits=2)

for row in results:
yield from self._process_snowflake_history_row(row)
Expand Down Expand Up @@ -434,8 +434,8 @@ def _check_usage_date_ranges(self) -> None:
self.report.max_access_history_time = db_row["MAX_TIME"].astimezone(
tz=timezone.utc
)
self.report.access_history_range_query_secs = round(
timer.elapsed_seconds(), 2
self.report.access_history_range_query_secs = timer.elapsed_seconds(
digits=2
)

def _get_operation_aspect_work_unit(
Expand Down
154 changes: 75 additions & 79 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
)
from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo
from datahub.ingestion.source.tableau.tableau_validation import check_user_role
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.ingestion_stage import IngestionStageContextReport
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Expand Down Expand Up @@ -643,7 +643,7 @@
@dataclass
class TableauSourceReport(
StaleEntityRemovalSourceReport,
IngestionStageReport,
IngestionStageContextReport,
):
get_all_datasources_query_failed: bool = False
num_get_datasource_query_failures: int = 0
Expand Down Expand Up @@ -688,9 +688,6 @@
num_hidden_assets_skipped: int = 0
logged_in_user: List[UserInfo] = dataclass_field(default_factory=list)

def compute_stats(self) -> None:
self.close_stage()


def report_user_role(report: TableauSourceReport, server: Server) -> None:
title: str = "Insufficient Permissions"
Expand Down Expand Up @@ -3494,88 +3491,87 @@
return {"permissions": json.dumps(groups)} if len(groups) > 0 else None

def ingest_tableau_site(self):
self.report.report_ingestion_stage_start(
with self.report.new_stage(
f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}"
)

# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
with PerfTimer() as timer:
self._populate_usage_stat_registry()
self.report.extract_usage_stats_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.config.permission_ingestion:
with PerfTimer() as timer:
self._fetch_groups()
self.report.fetch_groups_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
with PerfTimer() as timer:
self._populate_database_server_hostname_map()
self.report.populate_database_server_hostname_map_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

with PerfTimer() as timer:
self._populate_projects_registry()
self.report.populate_projects_registry_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.config.add_site_container:
yield from self.emit_site_container()
yield from self.emit_project_containers()

with PerfTimer() as timer:
yield from self.emit_workbooks()
self.report.emit_workbooks_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.sheet_ids:
with PerfTimer() as timer:
yield from self.emit_sheets()
self.report.emit_sheets_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.dashboard_ids:
with PerfTimer() as timer:
yield from self.emit_dashboards()
self.report.emit_dashboards_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
):
# Initialise the dictionary to later look-up for chart and dashboard stat
if self.config.extract_usage_stats:
with PerfTimer() as timer:
self._populate_usage_stat_registry()
self.report.extract_usage_stats_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.config.permission_ingestion:
with PerfTimer() as timer:
self._fetch_groups()
self.report.fetch_groups_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

# Populate the map of database names and database hostnames to be used later to map
# databases to platform instances.
if self.config.database_hostname_to_platform_instance_map:
with PerfTimer() as timer:
self._populate_database_server_hostname_map()
self.report.populate_database_server_hostname_map_timer[

Check warning on line 3517 in metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py#L3515-L3517

Added lines #L3515 - L3517 were not covered by tests
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.embedded_datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_embedded_datasources()
self.report.emit_embedded_datasources_timer[
self._populate_projects_registry()
self.report.populate_projects_registry_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_published_datasources()
self.report.emit_published_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.config.add_site_container:
yield from self.emit_site_container()
yield from self.emit_project_containers()

if self.custom_sql_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_custom_sql_datasources()
self.report.emit_custom_sql_datasources_timer[
yield from self.emit_workbooks()
self.report.emit_workbooks_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.database_tables:
with PerfTimer() as timer:
yield from self.emit_upstream_tables()
self.report.emit_upstream_tables_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
if self.sheet_ids:
with PerfTimer() as timer:
yield from self.emit_sheets()
self.report.emit_sheets_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.dashboard_ids:
with PerfTimer() as timer:
yield from self.emit_dashboards()
self.report.emit_dashboards_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.embedded_datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_embedded_datasources()
self.report.emit_embedded_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.datasource_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_published_datasources()
self.report.emit_published_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.custom_sql_ids_being_used:
with PerfTimer() as timer:
yield from self.emit_custom_sql_datasources()
self.report.emit_custom_sql_datasources_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)

if self.database_tables:
with PerfTimer() as timer:
yield from self.emit_upstream_tables()
self.report.emit_upstream_tables_timer[
self.site_content_url
] = timer.elapsed_seconds(digits=2)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from contextlib import AbstractContextManager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
Expand All @@ -25,25 +26,52 @@ class IngestionStageReport:
ingestion_stage: Optional[str] = None
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)

_timer: PerfTimer = PerfTimer()
_timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
)

def _close_stage(self) -> None:
if self._timer.is_running():
elapsed = round(self._timer.elapsed_seconds(), 2)
def report_ingestion_stage_start(self, stage: str) -> None:
if self._timer:
elapsed = self._timer.elapsed_seconds(digits=2)
logger.info(
f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds",
stacklevel=2,
)
if self.ingestion_stage:
self.ingestion_stage_durations[self.ingestion_stage] = elapsed

def report_ingestion_stage_start(self, stage: str) -> None:
self._close_stage()
else:
self._timer = PerfTimer()

self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
logger.info(f"Stage started: {self.ingestion_stage}")
self._timer.start()

def close_stage(self) -> None:
# just close ongoing stage if any
self._close_stage()

@dataclass
class IngestionStageContextReport:
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)

def new_stage(self, stage: str) -> "IngestionStageContext":
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
return IngestionStageContext(stage, self)


@dataclass
class IngestionStageContext(AbstractContextManager):
def __init__(self, stage: str, report: IngestionStageContextReport):
self._ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
self._timer: PerfTimer = PerfTimer()
self._report = report

def __enter__(self) -> "IngestionStageContext":
logger.info(f"Stage started: {self._ingestion_stage}")
self._timer.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
elapsed = self._timer.elapsed_seconds(digits=2)
logger.info(
f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds",
stacklevel=2,
)
self._report.ingestion_stage_durations[self._ingestion_stage] = elapsed
return None
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/utilities/perf_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
self.finish()
return None

def elapsed_seconds(self, digits: Optional[int] = None) -> float:
def elapsed_seconds(self, digits: int = 4) -> float:
"""
Returns the elapsed time in seconds.
"""
Expand All @@ -69,12 +69,12 @@
else:
elapsed = (self.end_time - self.start_time) + self._past_active_time

return round(elapsed, digits) if digits else elapsed
return round(elapsed, digits)

def assert_timer_is_running(self) -> None:
if not self.is_running():
self._error_state = True
logger.warning("Did you forget to start the timer ?")

Check warning on line 77 in metadata-ingestion/src/datahub/utilities/perf_timer.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/utilities/perf_timer.py#L76-L77

Added lines #L76 - L77 were not covered by tests

def is_running(self) -> bool:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run_test():
num_workunits, peak_memory_usage = workunit_sink(workunits)
report.set_ingestion_stage("All", "Done")
print(f"Workunits Generated: {num_workunits}")
print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds")
print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds")

print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
Expand Down
Loading
Loading