Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/sql): auto extract and use mode query user metadata #11307

Merged
merged 9 commits into from
Sep 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
get_query_fingerprint,
try_format_query,
)
from datahub.sql_parsing.tool_meta_extractor import (
ToolMetaExtractor,
ToolMetaExtractorReport,
)
from datahub.utilities.cooperative_timeout import CooperativeTimeoutError
from datahub.utilities.file_backed_collections import (
ConnectionWrapper,
Expand Down Expand Up @@ -179,6 +183,8 @@ class PreparsedQuery:
query_type_props: QueryTypeProps = dataclasses.field(
default_factory=lambda: QueryTypeProps()
)
# Use this to store addtitional key-value information about query for debugging
extra_info: Optional[dict] = None


@dataclasses.dataclass
Expand Down Expand Up @@ -247,6 +253,9 @@ class SqlAggregatorReport(Report):
num_operations_generated: int = 0
num_operations_skipped_due_to_filters: int = 0

# Tool Metadata Extraction
tool_meta_report: Optional[ToolMetaExtractorReport] = None

def compute_stats(self) -> None:
self.schema_resolver_count = self._aggregator._schema_resolver.schema_count()
self.num_unique_query_fingerprints = len(self._aggregator._query_map)
Expand Down Expand Up @@ -421,6 +430,10 @@ def __init__(
tablename="query_usage_counts",
)

# Tool Extractor
self._tool_meta_extractor = ToolMetaExtractor()
self.report.tool_meta_report = self._tool_meta_extractor.report

def close(self) -> None:
self._exit_stack.close()

Expand Down Expand Up @@ -736,6 +749,9 @@ def add_preparsed_query(
session_has_temp_tables: bool = True,
_is_internal: bool = False,
) -> None:

self._tool_meta_extractor.extract_bi_metadata(parsed)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit tricky - we only need it here, since observed query calls this?

imo it's a bit confusing because ObservedQuery also has extra_info, but that isn't copied over by add() or add_observed_query

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

How about removing extra_info entirely. I do not believe we are doing anything with user_via just yet.

Also I believe we could place the line self._tool_meta_extractor.extract_bi_metadata(parsed) in add() right before invoking add_preparsed_query or add_observed_query instead of here and also refractor PreparsedQuery / ObservedQuery to support base protocol - renaming of query_text to query, etc. What do you think? Would it be less confusing then ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some refactoring + comment + passing extra_info to PreParsedQuery.


if not _is_internal:
self.report.num_preparsed_queries += 1

Expand Down
96 changes: 96 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/tool_meta_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import json
import logging
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Tuple, Union

from typing_extensions import Protocol

from datahub.ingestion.api.report import Report
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn
from datahub.utilities.stats_collections import int_top_k_dict

UrnStr = str

logger = logging.getLogger(__name__)


class QueryLog(Protocol):
"""Represents Query Log Entry
expected by QueryMetaExractor interface
"""

query_text: str
user: Optional[Union[CorpUserUrn, CorpGroupUrn]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the idea that both preparsed queries and observed queries will match this interface?

Copy link
Collaborator Author

@mayurinehate mayurinehate Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ultimately, that's the goal . Anything that has "query", "user" and "extra_info" can use this.
ObservedQuery and PreparsedQuery share these concepts but not named/typed similarly. Refactoring can be done to achieve this, however currently not done as add_observed_query path ultimately invokes add_preparsed_query path. So it suffices for only PreparsedQuery to follow this protocol.

extra_info: Optional[dict]


def _get_last_line(query: str) -> str:
return query.rstrip().rsplit("\n", maxsplit=1)[-1]


@dataclass
class ToolMetaExtractorReport(Report):
num_queries_meta_extracted: Dict[str, int] = field(default_factory=int_top_k_dict)


class ToolMetaExtractor:
"""Enriches input query log entry with tool-specific details captured as part of query log

Such as
- Queries executed on warehouse by Mode BI tool contain information of actual user interacting
with BI tool which is more useful as compared to service account used to execute query as reported
by warehouse query logs.
"""

def __init__(self) -> None:
self.report = ToolMetaExtractorReport()
self.known_tool_extractors: List[Tuple[str, Callable[[QueryLog], bool]]] = [
(
"mode",
self._extract_mode_query,
)
]

def _extract_mode_query(self, entry: QueryLog) -> bool:
"""
Returns:
bool: whether QueryLog entry is that of mode and mode user info
is extracted into entry.
"""
last_line = _get_last_line(entry.query_text)

if not (
last_line.startswith("--")
and '"url":"https://modeanalytics.com' in last_line
):
return False

mode_json_raw = last_line[2:]
mode_json = json.loads(mode_json_raw)

original_user = entry.user

entry.user = email_to_user_urn(mode_json["email"])
entry.extra_info = entry.extra_info or {}
entry.extra_info["user_via"] = original_user

# TODO: Generate an "origin" urn.

return True

def extract_bi_metadata(self, entry: QueryLog) -> bool:

for tool, meta_extractor in self.known_tool_extractors:
try:
if meta_extractor(entry):
self.report.num_queries_meta_extracted[tool] += 1
return True
except Exception:
logger.debug("Tool metadata extraction failed with error : {e}")
return False


# NOTE: This is implementing the most common user urn generation scenario
# however may need to be revisited at later point
def email_to_user_urn(email: str) -> CorpUserUrn:
return CorpUserUrn(email.split("@", 1)[0])
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1643871600000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 1643871600000,
"actor": "urn:li:corpuser:foo"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1643846400000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"uniqueUserCount": 1,
"totalSqlQueries": 4,
"topSqlQueries": [
"create view `bigquery-dataset-1`.`view-1` as select * from `bigquery-dataset-1`.`table-1`",
"select * from `bigquery-dataset-1`.`table-1`"
],
"userCounts": [
{
"user": "urn:li:corpuser:foo",
"count": 4
}
],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "create view `bigquery-dataset-1`.`view-1` as select * from `bigquery-dataset-1`.`table-1`",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 1643871600000,
"actor": "urn:li:corpuser:foo"
},
"lastModified": {
"time": 1643871600000,
"actor": "urn:li:corpuser:foo"
}
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"json": {
"timestampMillis": 1643846400000,
"eventGranularity": {
"unit": "DAY",
"multiple": 1
},
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"uniqueUserCount": 1,
"totalSqlQueries": 4,
"topSqlQueries": [
"select * from `bigquery-dataset-1`.`view-1`\nLIMIT 100\n-- {\"user\":\"@bar\",\"email\":\"[email protected]\",\"url\":\"https://modeanalytics.com/acryltest/reports/6234ff78bc7d/runs/662b21949629/queries/f0aad24d5b37\",\"scheduled\":false}\n"
],
"userCounts": [
{
"user": "urn:li:corpuser:bar",
"count": 4
}
],
"fieldCounts": []
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.table-1,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1643871600000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:foo",
"operationType": "CREATE",
"customProperties": {
"query_urn": "urn:li:query:176428c30cc3197730c20f6d0161efe869a0a041876d23c044aa2cd60d4c7a12"
},
"lastUpdatedTimestamp": 1643871600000
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "bigquery-2022_02_03-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]
Loading
Loading