Skip to content

Commit

Permalink
feat(ingest/bigquery) - Reporting current state of BigQuery ingestion (
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored and Eric Yomi committed Feb 8, 2023
1 parent 4fd6f16 commit 1faa4c8
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,12 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_dropped(project_id.id)
continue
logger.info(f"Processing project: {project_id.id}")
self.report.set_project_state(project_id.id, "Metadata Extraction")
yield from self._process_project(conn, project_id)

if self.config.profiling.enabled:
logger.info("Starting profiling...")
self.report.set_project_state(project_id.id, "Profiling")
yield from self.profiler.get_workunits(self.db_tables)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -614,7 +616,7 @@ def _process_project(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

self.report.set_project_state(project_id, "Lineage Extraction")
yield from self.generate_lineage(project_id)

if self.config.include_usage_statistics:
Expand All @@ -637,6 +639,7 @@ def _process_project(
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

self.report.set_project_state(project_id, "Usage Extraction")
yield from self.generate_usage_statistics(project_id)

def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import dataclasses
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Counter, Dict, List, Optional
Expand All @@ -10,6 +11,8 @@
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.stats_collections import TopKDict

logger: logging.Logger = logging.getLogger(__name__)


@dataclass
class BigQueryV2Report(ProfilingSqlReport):
Expand Down Expand Up @@ -72,3 +75,12 @@ class BigQueryV2Report(ProfilingSqlReport):
operation_types_stat: Counter[str] = dataclasses.field(
default_factory=collections.Counter
)
current_project_status: Optional[Dict[str, Dict[str, datetime]]] = None

def set_project_state(self, project: str, stage: str) -> None:
if self.current_project_status:
logger.info(
"Previous project state was: %s",
self.to_pure_python_obj(self.current_project_status),
)
self.current_project_status = {project: {stage: datetime.now()}}
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ def _get_bigquery_log_entries_via_gcp_logging(
logger.info(
f"Starting log load from GCP Logging for {client.project}"
)
if i % 1000 == 0:
logger.info(
f"Loaded {i} log entries from GCP Log for {client.project}"
)
self.report.total_query_log_entries += 1
yield entry

Expand Down

0 comments on commit 1faa4c8

Please sign in to comment.