Skip to content

Commit

Permalink
fix(ingestion/hive): ignore sampling for tagged column/table (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored and sleeperdeep committed Jun 25, 2024
1 parent 693acb8 commit 0ad941b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 66 deletions.
199 changes: 133 additions & 66 deletions metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@
from sqlalchemy.exc import ProgrammingError
from typing_extensions import Concatenate, ParamSpec

from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.profiling.common import (
Cardinality,
convert_to_cardinality,
)
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import EditableSchemaMetadata
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
Expand Down Expand Up @@ -296,6 +299,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):

query_combiner: SQLAlchemyQueryCombiner

platform: str
env: str

def _get_columns_to_profile(self) -> List[str]:
if not self.config.any_field_level_metrics_enabled():
return []
Expand Down Expand Up @@ -670,6 +676,16 @@ def generate_dataset_profile( # noqa: C901 (complexity)
profile.columnCount = len(all_columns)
columns_to_profile = set(self._get_columns_to_profile())

(
ignore_table_sampling,
columns_list_to_ignore_sampling,
) = _get_columns_to_ignore_sampling(
self.dataset_name,
self.config.tags_to_ignore_sampling,
self.platform,
self.env,
)

logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
self.query_combiner.flush()

Expand Down Expand Up @@ -732,76 +748,80 @@ def generate_dataset_profile( # noqa: C901 (complexity)
if not profile.rowCount:
continue

self._get_dataset_column_sample_values(column_profile, column)

if (
type_ == ProfilerDataType.INT
or type_ == ProfilerDataType.FLOAT
or type_ == ProfilerDataType.NUMERIC
not ignore_table_sampling
and column not in columns_list_to_ignore_sampling
):
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)
self._get_dataset_column_mean(column_profile, column)
self._get_dataset_column_median(column_profile, column)
self._get_dataset_column_stdev(column_profile, column)

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if cardinality in {
Cardinality.FEW,
Cardinality.MANY,
Cardinality.VERY_MANY,
}:
self._get_dataset_column_quantiles(column_profile, column)
self._get_dataset_column_histogram(column_profile, column)

elif type_ == ProfilerDataType.STRING:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
self._get_dataset_column_sample_values(column_profile, column)

elif type_ == ProfilerDataType.DATETIME:
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)

# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if (
type_ == ProfilerDataType.INT
or type_ == ProfilerDataType.FLOAT
or type_ == ProfilerDataType.NUMERIC
):
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)
self._get_dataset_column_mean(column_profile, column)
self._get_dataset_column_median(column_profile, column)
self._get_dataset_column_stdev(column_profile, column)

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if cardinality in {
Cardinality.FEW,
Cardinality.MANY,
Cardinality.VERY_MANY,
}:
self._get_dataset_column_quantiles(column_profile, column)
self._get_dataset_column_histogram(column_profile, column)

elif type_ == ProfilerDataType.STRING:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

else:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
elif type_ == ProfilerDataType.DATETIME:
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)

# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

else:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
self.query_combiner.flush()
Expand Down Expand Up @@ -896,6 +916,7 @@ class DatahubGEProfiler:

base_engine: Engine
platform: str # passed from parent source config
env: str

# The actual value doesn't matter, it just matters that we use it consistently throughout.
_datasource_name_base: str = "my_sqlalchemy_datasource"
Expand All @@ -906,12 +927,15 @@ def __init__(
report: SQLSourceReport,
config: GEProfilingConfig,
platform: str,
env: str = "PROD",
):
self.report = report
self.config = config
self.times_taken = []
self.total_row_count = 0

self.env = env

# TRICKY: The call to `.engine` is quite important here. Connection.connect()
# returns a "branched" connection, which does not actually use a new underlying
# DB-API object from the connection pool. Engine.connect() does what we want to
Expand Down Expand Up @@ -1151,6 +1175,8 @@ def _generate_single_profile(
self.report,
custom_sql,
query_combiner,
self.platform,
self.env,
).generate_dataset_profile()

time_taken = timer.elapsed_seconds()
Expand Down Expand Up @@ -1309,3 +1335,44 @@ def create_bigquery_temp_table(
return bigquery_temp_table
finally:
raw_connection.close()


def _get_columns_to_ignore_sampling(
dataset_name: str, tags_to_ignore: Optional[List[str]], platform: str, env: str
) -> Tuple[bool, List[str]]:
logger.debug("Collecting columns to ignore for sampling")

ignore_table: bool = False
columns_to_ignore: List[str] = []

if not tags_to_ignore:
return ignore_table, columns_to_ignore

dataset_urn = mce_builder.make_dataset_urn(
name=dataset_name, platform=platform, env=env
)

datahub_graph = get_default_graph()

dataset_tags = datahub_graph.get_tags(dataset_urn)
if dataset_tags:
ignore_table = any(
tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
for tag_association in dataset_tags.tags
)

if not ignore_table:
metadata = datahub_graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableSchemaMetadata
)

if metadata:
for schemaField in metadata.editableSchemaFieldInfo:
if schemaField.globalTags:
columns_to_ignore.extend(
schemaField.fieldPath
for tag_association in schemaField.globalTags.tags
if tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
)

return ignore_table, columns_to_ignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ class GEProfilingConfig(ConfigModel):
description="Whether to profile external tables. Only Snowflake and Redshift supports this.",
)

tags_to_ignore_sampling: Optional[List[str]] = pydantic.Field(
default=None,
description=(
"Fixed list of tags to ignore sampling."
" If not specified, tables will be sampled based on `use_sampling`."
),
)

@pydantic.root_validator(pre=True)
def deprecate_bigquery_temp_table_schema(cls, values):
# TODO: Update docs to remove mention of this field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
report=self.report,
config=self.config.profiling,
platform=self.platform,
env=self.config.env,
)

def get_profile_args(self) -> Dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def get_profiler_instance(
report=self.report,
config=self.config.profiling,
platform=self.platform,
env=self.config.env,
)

def is_dataset_eligible_for_profiling(
Expand Down

0 comments on commit 0ad941b

Please sign in to comment.