Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Pass whether view is materialized; pass last_altered correctly #7660

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ def _process_schema(
)

if self.config.include_views:
db_views[dataset_name] = self.get_views_for_dataset(
conn, project_id, dataset_name
db_views[dataset_name] = BigQueryDataDictionary.get_views_for_dataset(
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
conn, project_id, dataset_name, self.config.profiling.enabled
)

for view in db_views[dataset_name]:
Expand Down Expand Up @@ -959,7 +959,9 @@ def gen_view_dataset_workunits(
view = cast(BigqueryView, table)
view_definition_string = view.view_definition
view_properties_aspect = ViewProperties(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
materialized=view.materialized,
viewLanguage="SQL",
viewLogic=view_definition_string,
)
yield MetadataChangeProposalWrapper(
entityUrn=self.gen_dataset_urn(
Expand Down Expand Up @@ -1286,19 +1288,6 @@ def get_core_table_details(

return table_items

def get_views_for_dataset(
self,
conn: bigquery.Client,
project_id: str,
dataset_name: str,
) -> List[BigqueryView]:
views: List[BigqueryView] = []

views = BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name, self.config.profiling.enabled
)
return views

def add_config_to_report(self):
self.report.include_table_lineage = self.config.include_table_lineage
self.report.use_date_sharded_audit_log_tables = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, cast

from google.cloud import bigquery
Expand All @@ -18,6 +19,16 @@
logger: logging.Logger = logging.getLogger(__name__)


class BigqueryTableType(Enum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

# See https://cloud.google.com/bigquery/docs/information-schema-tables#schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 thank you for linking documentation. more of this pls!

BASE_TABLE = "BASE TABLE"
EXTERNAL = "EXTERNAL"
VIEW = "VIEW"
MATERIALIZED_VIEW = "MATERIALIZED VIEW"
CLONE = "CLONE"
SNAPSHOT = "SNAPSHOT"


@dataclass
class BigqueryColumn(BaseColumn):
field_path: str
Expand Down Expand Up @@ -94,6 +105,7 @@ class BigqueryTable(BaseTable):
@dataclass
class BigqueryView(BaseView):
columns: List[BigqueryColumn] = field(default_factory=list)
materialized: bool = False


@dataclass
Expand Down Expand Up @@ -138,7 +150,7 @@ class BigqueryQuery:
"""

# https://cloud.google.com/bigquery/docs/information-schema-table-storage?hl=en
tables_for_dataset = """
tables_for_dataset = f"""
SELECT
t.table_catalog as table_catalog,
t.table_schema as table_schema,
Expand All @@ -159,9 +171,9 @@ class BigqueryQuery:
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base

FROM
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t
join `{project_id}`.`{dataset_name}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME
left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
left join (
Expand All @@ -173,20 +185,20 @@ class BigqueryQuery:
sum(case when storage_tier = 'LONG_TERM' then total_billable_bytes else 0 end) as long_term_billable_bytes,
sum(case when storage_tier = 'ACTIVE' then total_billable_bytes else 0 end) as active_billable_bytes,
from
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.PARTITIONS
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.PARTITIONS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes scare me - do we need to make them?

Copy link
Collaborator

@hsheth2 hsheth2 Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those are because it's using an f-string (template string), so we need to escape the {} characters when not doing var substitution

we then do another string format later which substitutes those

group by
table_name) as p on
t.table_name = p.table_name
WHERE
table_type in ('BASE TABLE', 'EXTERNAL')
{table_filter}
table_type in ({BigqueryTableType.BASE_TABLE}, {BigqueryTableType.EXTERNAL})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confused - why not {{BigQueryTablType.BASE_TABLE}} -- two brackets instead of one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change caused regression in final query due to missing quotes around constants.

{{table_filter}}
order by
table_schema ASC,
table_base ASC,
table_suffix DESC
"""

tables_for_dataset_without_partition_data = """
tables_for_dataset_without_partition_data = f"""
SELECT
t.table_catalog as table_catalog,
t.table_schema as table_schema,
Expand All @@ -200,20 +212,20 @@ class BigqueryQuery:
REGEXP_REPLACE(t.table_name, r"_(\\d+)$", "") as table_base

FROM
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t
left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
WHERE
table_type in ('BASE TABLE', 'EXTERNAL')
{table_filter}
table_type in ({BigqueryTableType.BASE_TABLE}, {BigqueryTableType.EXTERNAL})
{{table_filter}}
order by
table_schema ASC,
table_base ASC,
table_suffix DESC
"""

views_for_dataset: str = """
views_for_dataset: str = f"""
SELECT
t.table_catalog as table_catalog,
t.table_schema as table_schema,
Expand All @@ -227,19 +239,19 @@ class BigqueryQuery:
row_count,
size_bytes
FROM
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t
join `{project_id}`.`{dataset_name}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME
left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
join `{{project_id}}`.`{{dataset_name}}`.__TABLES__ as ts on ts.table_id = t.TABLE_NAME
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
WHERE
table_type in ('MATERIALIZED VIEW', 'VIEW')
table_type in ({BigqueryTableType.VIEW}, {BigqueryTableType.MATERIALIZED_VIEW})
order by
table_schema ASC,
table_name ASC
"""

views_for_dataset_without_data_read: str = """
views_for_dataset_without_data_read: str = f"""
SELECT
t.table_catalog as table_catalog,
t.table_schema as table_schema,
Expand All @@ -250,12 +262,12 @@ class BigqueryQuery:
is_insertable_into,
ddl as view_definition
FROM
`{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLES t
left join `{project_id}`.`{dataset_name}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
`{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLES t
left join `{{project_id}}`.`{{dataset_name}}`.INFORMATION_SCHEMA.TABLE_OPTIONS as tos on t.table_schema = tos.table_schema
and t.TABLE_NAME = tos.TABLE_NAME
and tos.OPTION_NAME = "description"
WHERE
table_type in ('MATERIALIZED VIEW', 'VIEW')
table_type in ({BigqueryTableType.VIEW}, {BigqueryTableType.MATERIALIZED_VIEW})
order by
table_schema ASC,
table_name ASC
Expand Down Expand Up @@ -470,11 +482,10 @@ def get_views_for_dataset(
BigqueryView(
name=table.table_name,
created=table.created,
last_altered=table.last_altered
if "last_altered" in table
else table.created,
last_altered=table.get("last_altered", table.created),
asikowitz marked this conversation as resolved.
Show resolved Hide resolved
comment=table.comment,
view_definition=table.view_definition,
materialized=table.table_type == BigqueryTableType.MATERIALIZED_VIEW,
)
for table in cur
]
Expand Down
122 changes: 118 additions & 4 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import os
from datetime import datetime
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Dict
from typing import Any, Dict, cast
from unittest.mock import patch

from google.cloud.bigquery.table import TableListItem
import pytest
from google.cloud.bigquery.table import Row, TableListItem

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.bigquery_v2.bigquery import BigqueryV2Source
Expand All @@ -14,8 +15,15 @@
BigQueryTableRef,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryProject
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigQueryDataDictionary,
BigqueryProject,
BigqueryTableType,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.lineage import LineageEdge
from datahub.metadata.com.linkedin.pegasus2avro.dataset import ViewProperties
from datahub.metadata.schema_classes import MetadataChangeProposalClass


def test_bigquery_uri():
Expand Down Expand Up @@ -405,3 +413,109 @@ def test_table_processing_logic_date_named_tables(client_mock, data_dictionary_m
] # alternatively
for table in tables.keys():
assert tables[table].table_id in ["test-table", "20220103"]


def create_row(d: Dict[str, Any]) -> Row:
values = []
field_to_index = {}
for i, (k, v) in enumerate(d.items()):
field_to_index[k] = i
values.append(v)
return Row(tuple(values), field_to_index)


@pytest.fixture
def bigquery_view_1():
now = datetime.now(tz=timezone.utc)
return BigqueryView(
name="table1",
created=now - timedelta(days=10),
last_altered=now - timedelta(hours=1),
comment="comment1",
view_definition="CREATE VIEW 1",
materialized=False,
)


@pytest.fixture
def bigquery_view_2():
now = datetime.now(tz=timezone.utc)
return BigqueryView(
name="table2",
created=now,
last_altered=now,
comment="comment2",
view_definition="CREATE VIEW 2",
materialized=True,
)


@patch(
"datahub.ingestion.source.bigquery_v2.bigquery_schema.BigQueryDataDictionary.get_query_result"
)
@patch("google.cloud.bigquery.client.Client")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

def test_get_views_for_dataset(
client_mock, query_mock, bigquery_view_1, bigquery_view_2
):
row1 = create_row(
dict(
table_name=bigquery_view_1.name,
created=bigquery_view_1.created,
last_altered=bigquery_view_1.last_altered,
comment=bigquery_view_1.comment,
view_definition=bigquery_view_1.view_definition,
table_type=BigqueryTableType.VIEW,
)
)
row2 = create_row( # Materialized view, no last_altered
dict(
table_name=bigquery_view_2.name,
created=bigquery_view_2.created,
comment=bigquery_view_2.comment,
view_definition=bigquery_view_2.view_definition,
table_type=BigqueryTableType.MATERIALIZED_VIEW,
)
)
query_mock.return_value = [row1, row2]

views = BigQueryDataDictionary.get_views_for_dataset(
conn=client_mock,
project_id="test-project",
dataset_name="test-dataset",
has_data_read=False,
)
assert views == [bigquery_view_1, bigquery_view_2]


@patch.object(BigqueryV2Source, "gen_dataset_workunits", lambda *args, **kwargs: [])
def test_gen_view_dataset_workunits(bigquery_view_1, bigquery_view_2):
project_id = "test-project"
dataset_name = "test-dataset"
config = BigQueryV2Config.parse_obj(
{
"project_id": project_id,
}
)
source: BigqueryV2Source = BigqueryV2Source(
config=config, ctx=PipelineContext(run_id="test")
)

gen = source.gen_view_dataset_workunits(
bigquery_view_1, [], project_id, dataset_name
)
mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert mcp.aspect == ViewProperties(
materialized=bigquery_view_1.materialized,
viewLanguage="SQL",
viewLogic=bigquery_view_1.view_definition,
)

gen = source.gen_view_dataset_workunits(
bigquery_view_2, [], project_id, dataset_name
)
mcp = cast(MetadataChangeProposalClass, next(iter(gen)).metadata)
assert mcp.aspect == ViewProperties(
materialized=bigquery_view_2.materialized,
viewLanguage="SQL",
viewLogic=bigquery_view_2.view_definition,
)