Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/sql): auto extract and use mode query user metadata #11307

Merged
merged 9 commits into from
Sep 9, 2024
16 changes: 0 additions & 16 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn
from datahub.utilities.urn_encoder import UrnEncoder
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -225,21 +224,6 @@ def make_user_urn(username: str) -> str:
)


def make_actor_urn(actor: str) -> Union[CorpUserUrn, CorpGroupUrn]:
"""
Makes a user urn if the input is not a user or group urn already
"""
return (
CorpUserUrn(actor)
if not actor.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else (
CorpUserUrn.from_string(actor)
if actor.startswith("urn:li:corpuser:")
else CorpGroupUrn.from_string(actor)
)
)


def make_group_urn(groupname: str) -> str:
"""
Makes a group urn if the input is not a user or group urn already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
BigQueryIdentifierBuilder,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
ObservedQuery,
Expand Down Expand Up @@ -363,7 +364,9 @@ def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery:
session_id=row["session_id"],
timestamp=row["creation_time"],
user=(
self.identifiers.gen_user_urn(row["user_email"])
CorpUserUrn.create_from_string(
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
self.identifiers.gen_user_urn(row["user_email"])
)
if row["user_email"]
else None
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from datahub.metadata.urns import DatasetUrn
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownQueryLineageInfo,
ObservedQuery,
SqlParsingAggregator,
)
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -118,11 +119,13 @@ def build(
if self.config.resolve_temp_table_in_lineage:
for temp_row in self._lineage_v1.get_temp_tables(connection=connection):
self.aggregator.add_observed_query(
query=temp_row.query_text,
default_db=self.database,
default_schema=self.config.default_schema,
session_id=temp_row.session_id,
query_timestamp=temp_row.start_time,
ObservedQuery(
query=temp_row.query_text,
default_db=self.database,
default_schema=self.config.default_schema,
session_id=temp_row.session_id,
timestamp=temp_row.start_time,
),
# The "temp table" query actually returns all CREATE TABLE statements, even if they
# aren't explicitly a temp table. As such, setting is_known_temp_table=True
# would not be correct. We already have mechanisms to autodetect temp tables,
Expand Down Expand Up @@ -263,11 +266,13 @@ def _process_sql_parser_lineage(self, lineage_row: LineageRow) -> None:
# TODO actor

self.aggregator.add_observed_query(
query=ddl,
default_db=self.database,
default_schema=self.config.default_schema,
query_timestamp=lineage_row.timestamp,
session_id=lineage_row.session_id,
ObservedQuery(
query=ddl,
default_db=self.database,
default_schema=self.config.default_schema,
timestamp=lineage_row.timestamp,
session_id=lineage_row.session_id,
)
)

def _make_filtered_target(self, lineage_row: LineageRow) -> Optional[DatasetUrn]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mce_builder import get_sys_time, make_actor_urn, make_ts_millis
from datahub.emitter.mce_builder import get_sys_time, make_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import compute_upstream_fields
from datahub.ingestion.api.closeable import Closeable
Expand Down Expand Up @@ -92,9 +92,16 @@ class LoggedQuery:


@dataclasses.dataclass
class ObservedQuery(LoggedQuery):
class ObservedQuery:
query: str
session_id: Optional[str] = None
timestamp: Optional[datetime] = None
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None
default_db: Optional[str] = None
default_schema: Optional[str] = None
query_hash: Optional[str] = None
usage_multiplier: int = 1

# Use this to store addtitional key-value information about query for debugging
extra_info: Optional[dict] = None

Expand Down Expand Up @@ -508,16 +515,7 @@ def add(
elif isinstance(item, PreparsedQuery):
self.add_preparsed_query(item)
elif isinstance(item, ObservedQuery):
self.add_observed_query(
query=item.query,
default_db=item.default_db,
default_schema=item.default_schema,
session_id=item.session_id,
usage_multiplier=item.usage_multiplier,
query_timestamp=item.timestamp,
user=make_actor_urn(item.user) if item.user else None,
query_hash=item.query_hash,
)
self.add_observed_query(item)
else:
raise ValueError(f"Cannot add unknown item type: {type(item)}")

Expand Down Expand Up @@ -661,18 +659,9 @@ def add_view_definition(

def add_observed_query(
self,
query: str,
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
query_timestamp: Optional[datetime] = None,
user: Optional[Union[CorpUserUrn, CorpGroupUrn]] = None,
session_id: Optional[
str
] = None, # can only see temp tables with the same session
usage_multiplier: int = 1,
observed: ObservedQuery,
is_known_temp_table: bool = False,
require_out_table_schema: bool = False,
query_hash: Optional[str] = None,
) -> None:
"""Add an observed query to the aggregator.

Expand All @@ -686,7 +675,7 @@ def add_observed_query(
self.report.num_observed_queries += 1

# All queries with no session ID are assumed to be part of the same session.
session_id = session_id or _MISSING_SESSION_ID
session_id = observed.session_id or _MISSING_SESSION_ID

# Load in the temp tables for this session.
schema_resolver: SchemaResolverInterface = (
Expand All @@ -696,17 +685,17 @@ def add_observed_query(

# Run the SQL parser.
parsed = self._run_sql_parser(
query,
default_db=default_db,
default_schema=default_schema,
observed.query,
default_db=observed.default_db,
default_schema=observed.default_schema,
schema_resolver=schema_resolver,
session_id=session_id,
timestamp=query_timestamp,
user=user,
timestamp=observed.timestamp,
user=observed.user,
)
if parsed.debug_info.error:
self.report.observed_query_parse_failures.append(
f"{parsed.debug_info.error} on query: {query[:100]}"
f"{parsed.debug_info.error} on query: {observed.query[:100]}"
)
if parsed.debug_info.table_error:
self.report.num_observed_queries_failed += 1
Expand All @@ -716,14 +705,14 @@ def add_observed_query(
if isinstance(parsed.debug_info.column_error, CooperativeTimeoutError):
self.report.num_observed_queries_column_timeout += 1

query_fingerprint = query_hash or parsed.query_fingerprint
query_fingerprint = observed.query_hash or parsed.query_fingerprint
self.add_preparsed_query(
PreparsedQuery(
query_id=query_fingerprint,
query_text=query,
query_count=usage_multiplier,
timestamp=query_timestamp,
user=user,
query_text=observed.query,
query_count=observed.usage_multiplier,
timestamp=observed.timestamp,
user=observed.user,
session_id=session_id,
query_type=parsed.query_type,
query_type_props=parsed.query_type_props,
Expand All @@ -734,6 +723,7 @@ def add_observed_query(
column_usage=compute_upstream_fields(parsed),
inferred_schema=infer_output_schema(parsed),
confidence_score=parsed.debug_info.confidence,
extra_info=observed.extra_info,
),
is_known_temp_table=is_known_temp_table,
require_out_table_schema=require_out_table_schema,
Expand All @@ -750,6 +740,9 @@ def add_preparsed_query(
_is_internal: bool = False,
) -> None:

# Adding tool specific metadata extraction here allows it
# to work for both ObservedQuery and PreparsedQuery as
# add_preparsed_query it used within add_observed_query.
self._tool_meta_extractor.extract_bi_metadata(parsed)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit tricky - we only need it here, since observed query calls this?

imo it's a bit confusing because ObservedQuery also has extra_info, but that isn't copied over by add() or add_observed_query

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

How about removing extra_info entirely. I do not believe we are doing anything with user_via just yet.

Also I believe we could place the line self._tool_meta_extractor.extract_bi_metadata(parsed) in add() right before invoking add_preparsed_query or add_observed_query instead of here and also refractor PreparsedQuery / ObservedQuery to support base protocol - renaming of query_text to query, etc. What do you think? Would it be less confusing then ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some refactoring + comment + passing extra_info to PreParsedQuery.


if not _is_internal:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from freezegun import freeze_time

from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.sql_parsing_aggregator import ObservedQuery
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList
from tests.test_helpers import mce_helpers
Expand All @@ -26,6 +27,9 @@ def _generate_queries_cached_file(tmp_path: Path, queries_json_path: Path) -> No
assert isinstance(queries, list)
for query in queries:
query["timestamp"] = datetime.fromisoformat(query["timestamp"])
query["user"] = (
CorpUserUrn.create_from_string(query["user"]) if query["user"] else None
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
)
query_cache.append(ObservedQuery(**query))

query_cache.close()
Expand Down
Loading
Loading