Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bq): fix ordering of queries for use_queries_v2 #11333

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,28 @@ def get_workunits_internal(

with self.report.audit_log_preprocessing_timer:
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
# Using regular dictionary with
# key: usage bucket
# value: File backed dictionary with query hash as key and observed query as value
# This structure is chosen in order to maintain order of execution of queries

queries_deduped: Dict[int, FileBackedDict[ObservedQuery]]
queries_deduped = self.deduplicate_queries(queries)
self.report.num_unique_queries = len(queries_deduped)
self.report.num_unique_queries = len(
set(
query_hash
for bucket in queries_deduped.values()
for query_hash in bucket
)
)

with self.report.audit_log_load_timer:
i = 0
for query_instances in queries_deduped.values():
for _, query in query_instances.items():
for queries_in_bucket in queries_deduped.values():
# Assuming that FileBackedDict is an OrderedDict.
# If this is not the case, we would need to order it by "last_query_timestamp"
# using queries_in_bucket.sql_query_iterator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure this assumption is valid - might need to write some tests to valid this, but I think sqlite doesn't guarantee anything

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we will need to tweak the code here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not superfamilar with file_backed_collections code. I updated the queries_extractor to use order by sql query when reading the results.

for query in queries_in_bucket.values():
if i > 0 and i % 10000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")

Expand All @@ -267,15 +281,15 @@ def get_workunits_internal(

def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> FileBackedDict[Dict[int, ObservedQuery]]:
) -> Dict[int, FileBackedDict[ObservedQuery]]:

# This fingerprint based deduplication is done here to reduce performance hit due to
# repetitive sql parsing while adding observed query to aggregator that would otherwise
# parse same query multiple times. In future, aggregator may absorb this deduplication.
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
# only once per day, although it may have happened multiple times throughout the day.

queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict()
queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict()

for i, query in enumerate(queries):
if i > 0 and i % 10000 == 0:
Expand All @@ -294,14 +308,20 @@ def deduplicate_queries(
query.query, self.identifiers.platform, fast=True
)

query_instances = queries_deduped.setdefault(query.query_hash, {})
if time_bucket not in queries_deduped:
# TODO: Cleanup, etc as required for file backed dicts after use
queries_deduped[time_bucket] = FileBackedDict[ObservedQuery](
extra_columns={"last_query_timestamp": lambda e: e.timestamp}
)

observed_query = query_instances.setdefault(time_bucket, query)
observed_query = queries_deduped[time_bucket].get(query.query_hash)

# If the query already exists for this time bucket, update its attributes
if observed_query is not query:
if observed_query is not None:
observed_query.usage_multiplier += 1
observed_query.timestamp = query.timestamp
else:
queries_deduped[time_bucket][query.query_hash] = query

return queries_deduped

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,9 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None:


@freeze_time(FROZEN_TIME)
def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None:
def test_table_lineage_via_temp_table_disordered_add(
pytestconfig: pytest.Config,
) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
Expand All @@ -554,7 +556,8 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N
mce_helpers.check_goldens_stream(
pytestconfig,
outputs=mcps,
golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json",
golden_path=RESOURCE_DIR
/ "test_table_lineage_via_temp_table_disordered_add.json",
)


Expand Down
Loading