Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): bigquery - external url support and a small profiling filter fix #6714

Merged
merged 6 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from google.cloud import bigquery
from google.cloud.bigquery.table import TableListItem

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
Expand Down Expand Up @@ -54,7 +55,10 @@
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client
from datahub.ingestion.source.bigquery_v2.common import (
BQ_EXTERNAL_TABLE_URL_TEMPLATE,
get_bigquery_client,
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
Expand Down Expand Up @@ -570,8 +574,12 @@ def _process_project(
bigquery_project.datasets
)
for bigquery_dataset in bigquery_project.datasets:

if not self.config.dataset_pattern.allowed(bigquery_dataset.name):
if not is_schema_allowed(
self.config.dataset_pattern,
bigquery_dataset.name,
project_id,
self.config.match_fully_qualified_names,
):
self.report.report_dropped(f"{bigquery_dataset.name}.*")
continue
try:
Expand Down Expand Up @@ -854,7 +862,12 @@ def gen_dataset_workunits(
else None,
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else TimeStamp(time=int(table.created.timestamp() * 1000))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

if table.created is not None
else None,
externalUrl=BQ_EXTERNAL_TABLE_URL_TEMPLATE.format(
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
project=project_id, dataset=dataset_name, table=table.name
),
)
if custom_properties:
dataset_properties.customProperties.update(custom_properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.",
)

debug_include_full_payloads: bool = Field(
default=False,
description="Include full payload into events. It is only for debugging and internal use.",
Expand Down Expand Up @@ -128,6 +133,20 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
logging.warning(
"schema_pattern will be ignored in favour of dataset_pattern. schema_pattern will be deprecated, please use dataset_pattern only."
)

match_fully_qualified_names = values.get("match_fully_qualified_names")

if (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!

dataset_pattern is not None
and dataset_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `dataset_pattern` to match against fully qualified schema name `<project_id>.<dataset_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)
return values

def get_table_pattern(self, pattern: List[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BQ_DATE_SHARD_FORMAT = "%Y%m%d"

BQ_EXTERNAL_TABLE_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m5!1m4!4m3!1s{project}!2s{dataset}!3s{table}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great! thanks for extracting into a nice place.

BQ_EXTERNAL_DATASET_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m4!1m3!3m2!1s{project}!2s{dataset}"


def _make_gcp_logging_client(
project_id: Optional[str] = None, extra_client_options: Dict[str, Any] = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,14 @@ def get_workunits(
word in column.data_type.lower()
for word in ["array", "struct", "geography", "json"]
):
normalized_table_name = BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table.name
).get_table_name()

self.config.profile_pattern.deny.append(
f"^{project}.{dataset}.{table.name}.{column.field_path}$"
f"^{normalized_table_name}.{column.field_path}$"
)

# Emit the profile work unit
profile_request = self.get_bigquery_profile_request(
project=project, dataset=dataset, table=table
Expand Down