diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index b810168513bbd8..e7109e26c1c8c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -773,8 +773,8 @@ def _process_schema( ) if self.config.include_views: - db_views[dataset_name] = self.get_views_for_dataset( - conn, project_id, dataset_name + db_views[dataset_name] = BigQueryDataDictionary.get_views_for_dataset( + conn, project_id, dataset_name, self.config.profiling.enabled ) for view in db_views[dataset_name]: @@ -959,7 +959,9 @@ def gen_view_dataset_workunits( view = cast(BigqueryView, table) view_definition_string = view.view_definition view_properties_aspect = ViewProperties( - materialized=False, viewLanguage="SQL", viewLogic=view_definition_string + materialized=view.materialized, + viewLanguage="SQL", + viewLogic=view_definition_string, ) yield MetadataChangeProposalWrapper( entityUrn=self.gen_dataset_urn( @@ -1286,19 +1288,6 @@ def get_core_table_details( return table_items - def get_views_for_dataset( - self, - conn: bigquery.Client, - project_id: str, - dataset_name: str, - ) -> List[BigqueryView]: - views: List[BigqueryView] = [] - - views = BigQueryDataDictionary.get_views_for_dataset( - conn, project_id, dataset_name, self.config.profiling.enabled - ) - return views - def add_config_to_report(self): self.report.include_table_lineage = self.config.include_table_lineage self.report.use_date_sharded_audit_log_tables = ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 60e37e43868533..b911f4659c53f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -2,6 +2,7 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone +from enum import Enum from typing import Any, Dict, List, Optional, cast from google.cloud import bigquery @@ -18,6 +19,16 @@ logger: logging.Logger = logging.getLogger(__name__) +class BigqueryTableType(Enum): + # See https://cloud.google.com/bigquery/docs/information-schema-tables#schema + BASE_TABLE = "BASE TABLE" + EXTERNAL = "EXTERNAL" + VIEW = "VIEW" + MATERIALIZED_VIEW = "MATERIALIZED VIEW" + CLONE = "CLONE" + SNAPSHOT = "SNAPSHOT" + + @dataclass class BigqueryColumn(BaseColumn): field_path: str @@ -94,6 +105,7 @@ class BigqueryTable(BaseTable): @dataclass class BigqueryView(BaseView): columns: List[BigqueryColumn] = field(default_factory=list) + materialized: bool = False @dataclass @@ -138,7 +150,7 @@ class BigqueryQuery: """ # https://cloud.google.com/bigquery/docs/information-schema-table-storage?hl=en - tables_for_dataset = """ + tables_for_dataset = f""" SELECT t.table_catalog as table_catalog, t.table_schema as table_schema, @@ -159,9 +171,9 @@ class BigqueryQuery: REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base FROM - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t - join `{project_id}`.`{dataset_name}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME - left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema and t.TABLE_NAME = tos.TABLE_NAME and tos.OPTION_NAME = "description" left join ( @@ -173,20 +185,20 @@ class BigqueryQuery: sum(case when storage_tier = 'LONG_TERM' then total_billable_bytes else 0 end) as long_term_billable_bytes, sum(case when storage_tier = 'ACTIVE' then total_billable_bytes else 0 end) as active_billable_bytes, from - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.PARTITIONS + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.PARTITIONS group by table_name) as p on t.table_name = p.table_name WHERE - table_type in ('BASE TABLE', 'EXTERNAL') -{table_filter} + table_type in ({BigqueryTableType.BASE_TABLE}, {BigqueryTableType.EXTERNAL}) +{{table_filter}} order by table_schema ASC, table_base ASC, table_suffix DESC """ - tables_for_dataset_without_partition_data = """ + tables_for_dataset_without_partition_data = f""" SELECT t.table_catalog as table_catalog, t.table_schema as table_schema, @@ -200,20 +212,20 @@ class BigqueryQuery: REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base FROM - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t - left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema and t.TABLE_NAME = tos.TABLE_NAME and tos.OPTION_NAME = "description" WHERE - table_type in ('BASE TABLE', 'EXTERNAL') -{table_filter} + table_type in ({BigqueryTableType.BASE_TABLE}, {BigqueryTableType.EXTERNAL}) +{{table_filter}} order by table_schema ASC, table_base ASC, table_suffix DESC """ - views_for_dataset: str = """ + views_for_dataset: str = f""" SELECT t.table_catalog as table_catalog, t.table_schema as table_schema, @@ -227,19 +239,19 @@ class BigqueryQuery: row_count, size_bytes FROM - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t - join `{project_id}`.`{dataset_name}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME - left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema and t.TABLE_NAME = tos.TABLE_NAME and tos.OPTION_NAME = "description" WHERE - table_type in ('MATERIALIZED VIEW', 'VIEW') + table_type in ({BigqueryTableType.VIEW}, {BigqueryTableType.MATERIALIZED_VIEW}) order by table_schema ASC, table_name ASC """ - views_for_dataset_without_data_read: str = """ + views_for_dataset_without_data_read: str = f""" SELECT t.table_catalog as table_catalog, t.table_schema as table_schema, @@ -250,12 +262,12 @@ class BigqueryQuery: is_insertable_into, ddl as view_definition FROM - `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t - left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema + `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t + left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema and t.TABLE_NAME = tos.TABLE_NAME and tos.OPTION_NAME = "description" WHERE - table_type in ('MATERIALIZED VIEW', 'VIEW') + table_type in ({BigqueryTableType.VIEW}, {BigqueryTableType.MATERIALIZED_VIEW}) order by table_schema ASC, table_name ASC @@ -470,11 +482,10 @@ def get_views_for_dataset( BigqueryView( name=table.table_name, created=table.created, - last_altered=table.last_altered - if "last_altered" in table - else table.created, + last_altered=table.get("last_altered", table.created), comment=table.comment, view_definition=table.view_definition, + materialized=table.table_type == BigqueryTableType.MATERIALIZED_VIEW, ) for table in cur ] diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 2a265e5fa6a602..a890dbb4c9e377 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -1,11 +1,12 @@ import json import os -from datetime import datetime +from datetime import datetime, timedelta, timezone from types import SimpleNamespace -from typing import Dict +from typing import Any, Dict, cast from unittest.mock import patch -from google.cloud.bigquery.table import TableListItem +import pytest +from google.cloud.bigquery.table import Row, TableListItem from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.bigquery_v2.bigquery import BigqueryV2Source @@ -14,8 +15,15 @@ BigQueryTableRef, ) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config -from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryProject +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigQueryDataDictionary, + BigqueryProject, + BigqueryTableType, + BigqueryView, +) from datahub.ingestion.source.bigquery_v2.lineage import LineageEdge +from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties +from datahub.metadata.schema_classes import MetadataChangeProposalClass def test_bigquery_uri(): @@ -405,3 +413,109 @@ def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_m ] # alternatively for table in tables.keys(): assert tables[table].table_id in ["test-table", "20220103"] + + +def create_row(d: Dict[str, Any]) -> Row: + values = [] + field_to_index = {} + for i, (k, v) in enumerate(d.items()): + field_to_index[k] = i + values.append(v) + return Row(tuple(values), field_to_index) + + +@pytest.fixture +def bigquery_view_1(): + now = datetime.now(tz=timezone.utc) + return BigqueryView( + name="table1", + created=now - timedelta(days=10), + last_altered=now - timedelta(hours=1), + comment="comment1", + view_definition="CREATE VIEW 1", + materialized=False, + ) + + +@pytest.fixture +def bigquery_view_2(): + now = datetime.now(tz=timezone.utc) + return BigqueryView( + name="table2", + created=now, + last_altered=now, + comment="comment2", + view_definition="CREATE VIEW 2", + materialized=True, + ) + + +@patch( + "datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_query_result" +) +@patch("google.cloud.bigquery.client.Client") +def test_get_views_for_dataset( + client_mock, query_mock, bigquery_view_1, bigquery_view_2 +): + row1 = create_row( + dict( + table_name=bigquery_view_1.name, + created=bigquery_view_1.created, + last_altered=bigquery_view_1.last_altered, + comment=bigquery_view_1.comment, + view_definition=bigquery_view_1.view_definition, + table_type=BigqueryTableType.VIEW, + ) + ) + row2 = create_row( # Materialized view, no last_altered + dict( + table_name=bigquery_view_2.name, + created=bigquery_view_2.created, + comment=bigquery_view_2.comment, + view_definition=bigquery_view_2.view_definition, + table_type=BigqueryTableType.MATERIALIZED_VIEW, + ) + ) + query_mock.return_value = [row1, row2] + + views = BigQueryDataDictionary.get_views_for_dataset( + conn=client_mock, + project_id="test-project", + dataset_name="test-dataset", + has_data_read=False, + ) + assert views == [bigquery_view_1, bigquery_view_2] + + +@patch.object(BigqueryV2Source, "gen_dataset_workunits", lambda *args, **kwargs: []) +def test_gen_view_dataset_workunits(bigquery_view_1, bigquery_view_2): + project_id = "test-project" + dataset_name = "test-dataset" + config = BigQueryV2Config.parse_obj( + { + "project_id": project_id, + } + ) + source: BigqueryV2Source = BigqueryV2Source( + config=config, ctx=PipelineContext(run_id="test") + ) + + gen = source.gen_view_dataset_workunits( + bigquery_view_1, [], project_id, dataset_name + ) + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert mcp.aspect == ViewProperties( + materialized=bigquery_view_1.materialized, + viewLanguage="SQL", + viewLogic=bigquery_view_1.view_definition, + ) + + gen = source.gen_view_dataset_workunits( + bigquery_view_2, [], project_id, dataset_name + ) + mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata) + assert mcp.aspect == ViewProperties( + materialized=bigquery_view_2.materialized, + viewLanguage="SQL", + viewLogic=bigquery_view_2.view_definition, + )