Skip to content

Commit

Permalink
feat(ingest/bigquery): Generate platform resource entities for BigQue…
Browse files Browse the repository at this point in the history
…ry labels (datahub-project#11602)

Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
2 people authored and sleeperdeep committed Dec 17, 2024
1 parent a05445e commit 3cff999
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.sql_parser_schema_resolver,
self.profiler,
self.identifiers,
self.ctx.graph,
)

self.add_config_to_report()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import logging
from dataclasses import dataclass
from typing import Optional

import cachetools
from pydantic import BaseModel, ValidationError

from datahub.api.entities.platformresource.platform_resource import (
PlatformResource,
PlatformResourceKey,
)
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.urns import TagUrn

logger: logging.Logger = logging.getLogger(__name__)


@dataclass
class BigQueryLabel:
key: str
value: Optional[str]

def primary_key(self) -> str:
return f"{self.key}/{self.value}" if self.value else f"{self.key}"


class BigQueryLabelInfo(BaseModel):
datahub_urn: str
managed_by_datahub: bool
key: str
value: str


@dataclass()
class BigQueryLabelPlatformResource:
datahub_urn: str
project: Optional[str]
managed_by_datahub: bool
label: BigQueryLabel

def platform_resource_key(self) -> PlatformResourceKey:
return PlatformResourceKey(
platform="bigquery",
resource_type="BigQueryLabelInfo",
platform_instance=self.project,
primary_key=self.label.primary_key(),
)

def platform_resource_info(self) -> BigQueryLabelInfo:
bq_label_info = BigQueryLabelInfo(
datahub_urn=self.datahub_urn,
managed_by_datahub=self.managed_by_datahub,
key=self.label.key,
value=self.label.value,
)
return bq_label_info

def platform_resource(self) -> PlatformResource:
return PlatformResource.create(
key=self.platform_resource_key(),
secondary_keys=[self.datahub_urn],
value=self.platform_resource_info(),
)


class BigQueryPlatformResourceHelper:
def __init__(
self,
bq_project: Optional[str],
graph: Optional[DataHubGraph],
):
self.bq_project = bq_project
self.graph = graph

platform_resource_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=500)

def get_platform_resource(
self, platform_resource_key: PlatformResourceKey
) -> Optional[PlatformResource]:
# if graph is not available we always create a new PlatformResource
if not self.graph:
return None
if self.platform_resource_cache.get(platform_resource_key.primary_key):
return self.platform_resource_cache.get(platform_resource_key.primary_key)

platform_resource = PlatformResource.from_datahub(
key=platform_resource_key, graph_client=self.graph
)
if platform_resource:
self.platform_resource_cache[
platform_resource_key.primary_key
] = platform_resource
return platform_resource
return None

def generate_label_platform_resource(
self,
bigquery_label: BigQueryLabel,
tag_urn: TagUrn,
managed_by_datahub: bool = True,
) -> PlatformResource:
new_platform_resource = BigQueryLabelPlatformResource(
datahub_urn=tag_urn.urn(),
project=self.bq_project,
managed_by_datahub=managed_by_datahub,
label=bigquery_label,
)

platform_resource = self.get_platform_resource(
new_platform_resource.platform_resource_key()
)
if platform_resource:
if (
platform_resource.resource_info
and platform_resource.resource_info.value
):
try:
existing_info: Optional[BigQueryLabelInfo] = platform_resource.resource_info.value.as_pydantic_object(BigQueryLabelInfo) # type: ignore
except ValidationError as e:
logger.error(
f"Error converting existing value to BigQueryLabelInfo: {e}. Creating new one. Maybe this is because of a non backward compatible schema change."
)
existing_info = None

if existing_info:
if (
new_platform_resource.platform_resource_info() == existing_info
or existing_info.managed_by_datahub
):
return platform_resource
else:
raise ValueError(
f"Datahub URN mismatch for platform resources. Old (existing) platform resource: {platform_resource} and new platform resource: {new_platform_resource}"
)

logger.info(f"Created platform resource {new_platform_resource}")

self.platform_resource_cache.update(
{
new_platform_resource.platform_resource_key().primary_key: new_platform_resource.platform_resource()
}
)

return new_platform_resource.platform_resource()
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from google.cloud.bigquery.table import TableListItem

from datahub.api.entities.platformresource.platform_resource import PlatformResource
from datahub.configuration.pattern_utils import is_schema_allowed, is_tag_allowed
from datahub.emitter.mce_builder import make_tag_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand All @@ -16,6 +17,7 @@
ClassificationHandler,
classification_workunit_processor,
)
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigqueryTableIdentifier,
BigQueryTableRef,
Expand All @@ -25,6 +27,11 @@
from datahub.ingestion.source.bigquery_v2.bigquery_helper import (
unquote_and_decode_unicode_escape_seq,
)
from datahub.ingestion.source.bigquery_v2.bigquery_platform_resource_helper import (
BigQueryLabel,
BigQueryLabelInfo,
BigQueryPlatformResourceHelper,
)
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryColumn,
Expand Down Expand Up @@ -84,6 +91,7 @@
GlobalTagsClass,
TagAssociationClass,
)
from datahub.metadata.urns import TagUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.hive_schema_to_avro import (
Expand Down Expand Up @@ -160,6 +168,7 @@ def __init__(
sql_parser_schema_resolver: SchemaResolver,
profiler: BigqueryProfiler,
identifiers: BigQueryIdentifierBuilder,
graph: Optional[DataHubGraph] = None,
):
self.config = config
self.report = report
Expand All @@ -168,6 +177,7 @@ def __init__(
self.sql_parser_schema_resolver = sql_parser_schema_resolver
self.profiler = profiler
self.identifiers = identifiers
self.graph = graph

self.classification_handler = ClassificationHandler(self.config, self.report)
self.data_reader: Optional[BigQueryDataReader] = None
Expand All @@ -188,6 +198,21 @@ def __init__(
# Maps snapshot ref -> Snapshot
self.snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot] = FileBackedDict()

bq_project = (
self.config.project_on_behalf
if self.config.project_on_behalf
else self.config.credential.project_id
if self.config.credential
else None
)

self.platform_resource_helper: BigQueryPlatformResourceHelper = (
BigQueryPlatformResourceHelper(
bq_project,
self.graph,
)
)

@property
def store_table_refs(self):
return (
Expand Down Expand Up @@ -264,13 +289,28 @@ def gen_dataset_containers(
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(project_id, dataset)

tags_joined: Optional[List[str]] = None
tags_joined: List[str] = []
if tags and self.config.capture_dataset_label_as_tag:
tags_joined = [
self.make_tag_from_label(k, v)
for k, v in tags.items()
if is_tag_allowed(self.config.capture_dataset_label_as_tag, k)
]
for k, v in tags.items():
if is_tag_allowed(self.config.capture_dataset_label_as_tag, k):
tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v))
label = BigQueryLabel(key=k, value=v)
try:
platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource(
label, tag_urn, managed_by_datahub=False
)
label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore
BigQueryLabelInfo
)
tag_urn = TagUrn.from_string(label_info.datahub_urn)

for mcpw in platform_resource.to_mcps():
yield mcpw.as_workunit()
except ValueError as e:
logger.warning(
f"Failed to generate platform resource for label {k}:{v}: {e}"
)
tags_joined.append(tag_urn.urn())

database_container_key = self.gen_project_id_key(database=project_id)

Expand Down Expand Up @@ -676,10 +716,11 @@ def _process_snapshot(
dataset_name=dataset_name,
)

def make_tag_from_label(self, key: str, value: str) -> str:
if not value.startswith(ENCODED_TAG_PREFIX):
def make_tag_urn_from_label(self, key: str, value: str) -> str:
if value:
return make_tag_urn(f"""{key}:{value}""")
return self.modified_base32decode(value)
else:
return make_tag_urn(key)

def gen_table_dataset_workunits(
self,
Expand Down Expand Up @@ -724,13 +765,26 @@ def gen_table_dataset_workunits(
tags_to_add = None
if table.labels and self.config.capture_table_label_as_tag:
tags_to_add = []
tags_to_add.extend(
[
self.make_tag_from_label(k, v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_table_label_as_tag, k)
]
)
for k, v in table.labels.items():
if is_tag_allowed(self.config.capture_table_label_as_tag, k):
tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v))
try:
label = BigQueryLabel(key=k, value=v)
platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource(
label, tag_urn, managed_by_datahub=False
)
label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore
BigQueryLabelInfo
)
tag_urn = TagUrn.from_string(label_info.datahub_urn)

for mcpw in platform_resource.to_mcps():
yield mcpw.as_workunit()
except ValueError as e:
logger.warning(
f"Failed to generate platform resource for label {k}:{v}: {e}"
)
tags_to_add.append(tag_urn.urn())

yield from self.gen_dataset_workunits(
table=table,
Expand All @@ -749,13 +803,29 @@ def gen_view_dataset_workunits(
project_id: str,
dataset_name: str,
) -> Iterable[MetadataWorkUnit]:
tags_to_add = None
tags_to_add = []
if table.labels and self.config.capture_view_label_as_tag:
tags_to_add = [
self.make_tag_from_label(k, v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_view_label_as_tag, k)
]
for k, v in table.labels.items():
if is_tag_allowed(self.config.capture_view_label_as_tag, k):
tag_urn = TagUrn.from_string(self.make_tag_urn_from_label(k, v))
try:
label = BigQueryLabel(key=k, value=v)
platform_resource: PlatformResource = self.platform_resource_helper.generate_label_platform_resource(
label, tag_urn, managed_by_datahub=False
)
label_info: BigQueryLabelInfo = platform_resource.resource_info.value.as_pydantic_object( # type: ignore
BigQueryLabelInfo
)
tag_urn = TagUrn.from_string(label_info.datahub_urn)

for mcpw in platform_resource.to_mcps():
yield mcpw.as_workunit()
except ValueError as e:
logger.warning(
f"Failed to generate platform resource for label {k}:{v}: {e}"
)

tags_to_add.append(tag_urn.urn())
yield from self.gen_dataset_workunits(
table=table,
columns=columns,
Expand Down
Loading

0 comments on commit 3cff999

Please sign in to comment.