Skip to content

Commit

Permalink
fix(ingest/bigquery): fix lineage if multiple sql expression passed i…
Browse files Browse the repository at this point in the history
…n and destination table set (datahub-project#10212)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
2 people authored and sleeperdeep committed Jun 25, 2024
1 parent 40a0489 commit 49fa8fe
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def get_table_and_shard(table_name: str) -> Tuple[Optional[str], Optional[str]]:
@classmethod
def from_string_name(cls, table: str) -> "BigqueryTableIdentifier":
parts = table.split(".")
return cls(parts[0], parts[1], parts[2])
# If the table name contains dollar sign, it is a referrence to a partitioned table and we have to strip it
table = parts[2].split("$", 1)[0]
return cls(parts[0], parts[1], table)

def raw_table_name(self):
return f"{self.project_id}.{self.dataset}.{self.table}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)

import humanfriendly
import sqlglot
from google.cloud.datacatalog import lineage_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient

Expand Down Expand Up @@ -737,20 +738,37 @@ def _create_lineage_map(

# Try the sql parser first.
if self.config.lineage_use_sql_parser:
logger.debug(
f"Using sql parser for lineage extraction for destination table: {destination_table.table_identifier.get_table_name()}, queryType: {e.statementType}, query: {e.query}"
)
if e.statementType == "SELECT":
# We wrap select statements in a CTE to make them parseable as insert statement.
# This is a workaround for the sql parser to support the case where the user runs a query and inserts the result into a table..
query = f"""create table `{destination_table.table_identifier.get_table_name()}` AS
(
{e.query}
)"""
try:
parsed_queries = sqlglot.parse(e.query, "bigquery")
if parsed_queries[-1]:
query = f"""create table `{destination_table.get_sanitized_table_ref().table_identifier.get_table_name()}` AS
(
{parsed_queries[-1].sql(dialect='bigquery')}
)"""
else:
query = e.query
except Exception:
logger.debug(
f"Failed to parse select-based lineage query {e.query} for table {destination_table}."
"Sql parsing will likely fail for this query, which will result in a fallback to audit log."
)
query = e.query
else:
query = e.query
raw_lineage = sqlglot_lineage(
query,
schema_resolver=sql_parser_schema_resolver,
default_db=e.project_id,
)
logger.debug(
f"Input tables: {raw_lineage.in_tables}, Output tables: {raw_lineage.out_tables}"
)
if raw_lineage.debug_info.table_error:
logger.debug(
f"Sql Parser failed on query: {e.query}. It won't cause any major issues, but "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def usage_statistics_query(top_n: int) -> str:
r.resource,
q.query,
COUNT(r.key) as query_count,
ROW_NUMBER() over (PARTITION BY r.timestamp, r.resource, q.query ORDER BY COUNT(r.key) DESC, q.query) as rank
ROW_NUMBER() over (PARTITION BY r.timestamp, r.resource ORDER BY COUNT(r.key) DESC) as rank
FROM
read_events r
INNER JOIN query_events q ON r.name = q.key
Expand Down Expand Up @@ -256,6 +256,7 @@ class UsageStatistic:

def usage_statistics(self, top_n: int) -> Iterator[UsageStatistic]:
query = self.usage_statistics_query(top_n)

rows = self.read_events.sql_query_iterator(
query, refs=[self.query_events, self.column_accesses]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ def make_usage_workunit(

top_sql_queries: Optional[List[str]] = None
if query_freq is not None:
if top_n_queries < len(query_freq):
logger.warn(
f"Top N query limit exceeded on {str(resource)}. Max number of queries {top_n_queries} < {len(query_freq)}. Truncating top queries to {top_n_queries}."
)
query_freq = query_freq[0:top_n_queries]

budget_per_query: int = int(queries_character_limit / top_n_queries)
top_sql_queries = [
trim_query(
Expand Down
121 changes: 120 additions & 1 deletion metadata-ingestion/tests/unit/test_bigquery_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def config() -> BigQueryV2Config:
end_time=TS_2 + timedelta(minutes=1),
usage=BigQueryUsageConfig(
include_top_n_queries=True,
top_n_queries=3,
top_n_queries=30,
bucket_duration=BucketDuration.DAY,
include_operational_stats=False,
),
Expand Down Expand Up @@ -878,6 +878,125 @@ def test_usage_counts_no_columns(
assert not caplog.records


def test_usage_counts_no_columns_and_top_n_limit_hit(
caplog: pytest.LogCaptureFixture,
usage_extractor: BigQueryUsageExtractor,
config: BigQueryV2Config,
) -> None:
config.usage.top_n_queries = 1

job_name = "job_name"
ref = BigQueryTableRef(
BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name)
)
events = [
AuditEvent.create(
ReadEvent(
jobName=job_name,
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
ReadEvent(
jobName="job_name_2",
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
ReadEvent(
jobName="job_name_3",
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
QueryEvent(
job_name=job_name,
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT * FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
AuditEvent.create(
QueryEvent(
job_name="job_name_2",
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT * FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
AuditEvent.create(
QueryEvent(
job_name="job_name_3",
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT my_column FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
]
caplog.clear()
with caplog.at_level(logging.WARNING):
workunits = usage_extractor._get_workunits_internal(
events, [TABLE_REFS[TABLE_1.name]]
)
expected = [
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=3,
topSqlQueries=["SELECT * FROM table_1"],
uniqueUserCount=1,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=3,
userEmail=ACTOR_1,
),
],
fieldCounts=[],
),
)
]
compare_workunits(workunits, expected)
assert not caplog.records


@freeze_time(FROZEN_TIME)
@patch.object(BigQueryUsageExtractor, "_generate_usage_workunits")
def test_operational_stats(
Expand Down

0 comments on commit 49fa8fe

Please sign in to comment.