diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 04d7f67daf73a..7fe76e1b2b3b9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -10,6 +10,7 @@ from google.cloud import bigquery from google.cloud.bigquery.table import TableListItem +from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter.mce_builder import ( make_container_urn, make_data_platform_urn, @@ -54,7 +55,11 @@ BigqueryTable, BigqueryView, ) -from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client +from datahub.ingestion.source.bigquery_v2.common import ( + BQ_EXTERNAL_DATASET_URL_TEMPLATE, + BQ_EXTERNAL_TABLE_URL_TEMPLATE, + get_bigquery_client, +) from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor @@ -459,6 +464,11 @@ def gen_dataset_containers( dataset, ["Dataset"], database_container_key, + external_url=BQ_EXTERNAL_DATASET_URL_TEMPLATE.format( + project=project_id, dataset=dataset + ) + if self.config.include_external_url + else None, ) self.stale_entity_removal_handler.add_entity_to_state( @@ -570,8 +580,12 @@ def _process_project( bigquery_project.datasets ) for bigquery_dataset in bigquery_project.datasets: - - if not self.config.dataset_pattern.allowed(bigquery_dataset.name): + if not is_schema_allowed( + self.config.dataset_pattern, + bigquery_dataset.name, + project_id, + self.config.match_fully_qualified_names, + ): self.report.report_dropped(f"{bigquery_dataset.name}.*") continue try: @@ -854,6 +868,13 @@ def gen_dataset_workunits( else None, lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) if table.last_altered is not None + else TimeStamp(time=int(table.created.timestamp() * 1000)) + if table.created is not None + else None, + externalUrl=BQ_EXTERNAL_TABLE_URL_TEMPLATE.format( + project=project_id, dataset=dataset_name, table=table.name + ) + if self.config.include_external_url else None, ) if custom_properties: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 94117f26ff794..c2518cd4fc478 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -48,6 +48,16 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig): description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", ) + match_fully_qualified_names: bool = Field( + default=False, + description="Whether `dataset_pattern` is matched against fully qualified dataset name `.`.", + ) + + include_external_url: bool = Field( + default=True, + description="Whether to populate BigQuery Console url to Datasets/Tables", + ) + debug_include_full_payloads: bool = Field( default=False, description="Include full payload into events. It is only for debugging and internal use.", @@ -128,6 +138,20 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: logging.warning( "schema_pattern will be ignored in favour of dataset_pattern. schema_pattern will be deprecated, please use dataset_pattern only." ) + + match_fully_qualified_names = values.get("match_fully_qualified_names") + + if ( + dataset_pattern is not None + and dataset_pattern != AllowDenyPattern.allow_all() + and match_fully_qualified_names is not None + and not match_fully_qualified_names + ): + logger.warning( + "Please update `dataset_pattern` to match against fully qualified schema name `.` and set config `match_fully_qualified_names : True`." + "Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. " + "The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`." + ) return values def get_table_pattern(self, pattern: List[str]) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py index 8a00f8f1d5fe4..4ff509858b87d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py @@ -8,6 +8,9 @@ BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" BQ_DATE_SHARD_FORMAT = "%Y%m%d" +BQ_EXTERNAL_TABLE_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m5!1m4!4m3!1s{project}!2s{dataset}!3s{table}" +BQ_EXTERNAL_DATASET_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m4!1m3!3m2!1s{project}!2s{dataset}" + def _make_gcp_logging_client( project_id: Optional[str] = None, extra_client_options: Dict[str, Any] = {} diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index a83787beb84d8..9e232993b7c5f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -172,9 +172,14 @@ def get_workunits( word in column.data_type.lower() for word in ["array", "struct", "geography", "json"] ): + normalized_table_name = BigqueryTableIdentifier( + project_id=project, dataset=dataset, table=table.name + ).get_table_name() + self.config.profile_pattern.deny.append( - f"^{project}.{dataset}.{table.name}.{column.field_path}$" + f"^{normalized_table_name}.{column.field_path}$" ) + # Emit the profile work unit profile_request = self.get_bigquery_profile_request( project=project, dataset=dataset, table=table