Skip to content

Commit

Permalink
more logs, sharded table legacy support
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Jul 26, 2024
1 parent e7d7f3e commit 505911c
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = (
self.config.sharded_table_pattern
)
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ class BigQueryIdentifierConfig(
" because the project id is represented as the top-level container.",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
)


class BigQueryV2Config(
BigQueryConnectionConfig,
Expand Down Expand Up @@ -397,11 +402,6 @@ def have_table_data_read_permission(self) -> bool:
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
)

extract_policy_tags_from_catalog: bool = Field(
default=False,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(
structured_reporter: SourceReport,
) -> None:
self.identifier_config = identifier_config
if self.identifier_config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""
self.structured_reporter = structured_reporter

def gen_dataset_urn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def get_workunits_internal(
queries.append(entry)

with self.report.audit_log_load_timer:
for query in queries:
for i, query in enumerate(queries):
if i % 1000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")
self.aggregator.add(query)

yield from auto_workunit(self.aggregator.gen_metadata())
Expand Down Expand Up @@ -316,5 +318,6 @@ def _build_enriched_query_log_query(
WHERE
creation_time >= '{audit_start_time}' AND
creation_time <= '{audit_end_time}' AND
error_result is null
error_result is null AND
not CONTAINS_SUBSTR(query, '.INFORMATION_SCHEMA.')
"""
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ def get_workunits_internal(
queries.append(entry)

with self.report.audit_log_load_timer:
for query in queries:
for i, query in enumerate(queries):
if i % 1000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")
self.aggregator.add(query)

yield from auto_workunit(self.aggregator.gen_metadata())
Expand Down

0 comments on commit 505911c

Please sign in to comment.