Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/sigma): migrate sigma workbooks from container to dashboard #11939

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/sigma/sigma_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This source extracts the following:
| Sigma | Datahub | Notes |
|------------------------|---------------------------------------------------------------|----------------------------------|
| `Workspace` | [Container](../../metamodel/entities/container.md) | SubType `"Sigma Workspace"` |
| `Workbook` | [Container](../../metamodel/entities/container.md) | SubType `"Sigma Workbook"` |
| `Workbook` | [Dashboard](../../metamodel/entities/dashboard.md) | SubType `"Sigma Workbook"` |
| `Page` | [Dashboard](../../metamodel/entities/dashboard.md) | |
| `Element` | [Chart](../../metamodel/entities/chart.md) | |
| `Dataset` | [Dataset](../../metamodel/entities/dataset.md) | SubType `"Sigma Dataset"` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Workbook(BaseModel):
path: str
latestVersion: int
workspaceId: Optional[str] = None
description: Optional[str] = None
pages: List[Page] = []
badge: Optional[str] = None

Expand Down
144 changes: 101 additions & 43 deletions metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_entity_to_container, gen_containers
from datahub.emitter.mcp_builder import (
add_entity_to_container,
add_owner_to_entity_wu,
add_tags_to_entity_wu,
gen_containers,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -59,12 +64,14 @@
UpstreamLineage,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
BrowsePathEntryClass,
BrowsePathsV2Class,
ChangeAuditStampsClass,
ChartInfoClass,
DashboardInfoClass,
DataPlatformInstanceClass,
EdgeClass,
GlobalTagsClass,
InputFieldClass,
InputFieldsClass,
Expand All @@ -74,6 +81,7 @@
SchemaFieldClass,
SchemaFieldDataTypeClass,
StringTypeClass,
SubTypesClass,
TagAssociationClass,
)
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
Expand Down Expand Up @@ -257,11 +265,6 @@ def _gen_entity_browsepath_aspect(
entries = [
BrowsePathEntryClass(id=parent_entity_urn, urn=parent_entity_urn)
] + [BrowsePathEntryClass(id=path) for path in paths]
if self.config.platform_instance:
urn = builder.make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
)
entries = [BrowsePathEntryClass(id=urn, urn=urn)] + entries
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=BrowsePathsV2Class(entries),
Expand Down Expand Up @@ -424,11 +427,11 @@ def _gen_elements_workunit(
elements: List[Element],
workbook: Workbook,
all_input_fields: List[InputFieldClass],
paths: List[str],
) -> Iterable[MetadataWorkUnit]:
"""
Map Sigma page element to Datahub Chart
"""

for element in elements:
chart_urn = builder.make_chart_urn(
platform=self.platform,
Expand Down Expand Up @@ -459,11 +462,14 @@ def _gen_elements_workunit(
),
).as_workunit()

yield from add_entity_to_container(
container_key=self._gen_workbook_key(workbook.workbookId),
entity_type="chart",
entity_urn=chart_urn,
)
if workbook.workspaceId:
yield self._gen_entity_browsepath_aspect(
entity_urn=chart_urn,
parent_entity_urn=builder.make_container_urn(
self._gen_workspace_key(workbook.workspaceId)
),
paths=paths + [workbook.name],
)

# Add sigma dataset's upstream dataset urn mapping
for dataset_urn, upstream_dataset_urns in inputs.items():
Expand Down Expand Up @@ -494,7 +500,9 @@ def _gen_elements_workunit(

all_input_fields.extend(element_input_fields)

def _gen_pages_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUnit]:
def _gen_pages_workunit(
self, workbook: Workbook, paths: List[str]
) -> Iterable[MetadataWorkUnit]:
"""
Map Sigma workbook page to Datahub dashboard
"""
Expand All @@ -505,20 +513,23 @@ def _gen_pages_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUnit]:

yield self._gen_dashboard_info_workunit(page)

yield from add_entity_to_container(
container_key=self._gen_workbook_key(workbook.workbookId),
entity_type="dashboard",
entity_urn=dashboard_urn,
)

dpi_aspect = self._gen_dataplatform_instance_aspect(dashboard_urn)
if dpi_aspect:
yield dpi_aspect

all_input_fields: List[InputFieldClass] = []

if workbook.workspaceId:
yield self._gen_entity_browsepath_aspect(
entity_urn=dashboard_urn,
parent_entity_urn=builder.make_container_urn(
self._gen_workspace_key(workbook.workspaceId)
),
paths=paths + [workbook.name],
)

yield from self._gen_elements_workunit(
page.elements, workbook, all_input_fields
page.elements, workbook, all_input_fields, paths
)

yield MetadataChangeProposalWrapper(
Expand All @@ -531,42 +542,89 @@ def _gen_workbook_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUni
Map Sigma Workbook to Datahub container
"""
owner_username = self.sigma_api.get_user_name(workbook.createdBy)
workbook_key = self._gen_workbook_key(workbook.workbookId)
yield from gen_containers(
container_key=workbook_key,
name=workbook.name,
sub_types=[BIContainerSubTypes.SIGMA_WORKBOOK],
parent_container_key=(
self._gen_workspace_key(workbook.workspaceId)
if workbook.workspaceId
else None

dashboard_urn = self._gen_dashboard_urn(workbook.workbookId)

yield self._gen_entity_status_aspect(dashboard_urn)

lastModified = AuditStampClass(
time=int(workbook.updatedAt.timestamp() * 1000),
actor="urn:li:corpuser:datahub",
)
created = AuditStampClass(
time=int(workbook.createdAt.timestamp() * 1000),
actor="urn:li:corpuser:datahub",
)

dashboard_info_cls = DashboardInfoClass(
title=workbook.name,
description=workbook.description if workbook.description else "",
dashboards=[
EdgeClass(
destinationUrn=self._gen_dashboard_urn(page.get_urn_part()),
sourceUrn=dashboard_urn,
)
for page in workbook.pages
],
externalUrl=workbook.url,
lastModified=ChangeAuditStampsClass(
created=created, lastModified=lastModified
),
extra_properties={
customProperties={
"path": workbook.path,
"latestVersion": str(workbook.latestVersion),
},
owner_urn=(
builder.make_user_urn(owner_username)
if self.config.ingest_owner and owner_username
else None
),
external_url=workbook.url,
tags=[workbook.badge] if workbook.badge else None,
created=int(workbook.createdAt.timestamp() * 1000),
last_modified=int(workbook.updatedAt.timestamp() * 1000),
)
yield MetadataChangeProposalWrapper(
entityUrn=dashboard_urn, aspect=dashboard_info_cls
).as_workunit()

# Set subtype
yield MetadataChangeProposalWrapper(
entityUrn=dashboard_urn,
aspect=SubTypesClass(typeNames=[BIContainerSubTypes.SIGMA_WORKBOOK]),
).as_workunit()

# Ownership
owner_urn = (
builder.make_user_urn(owner_username)
if self.config.ingest_owner and owner_username
else None
)
if owner_urn:
yield from add_owner_to_entity_wu(
entity_type="dashboard",
entity_urn=dashboard_urn,
owner_urn=owner_urn,
)

# Tags
tags = [workbook.badge] if workbook.badge else None
if tags:
yield from add_tags_to_entity_wu(
entity_type="dashboard",
entity_urn=dashboard_urn,
tags=sorted(tags),
)

paths = workbook.path.split("/")[1:]
if len(paths) > 0 and workbook.workspaceId:
if workbook.workspaceId:
yield self._gen_entity_browsepath_aspect(
entity_urn=builder.make_container_urn(workbook_key),
entity_urn=dashboard_urn,
parent_entity_urn=builder.make_container_urn(
self._gen_workspace_key(workbook.workspaceId)
),
paths=paths,
paths=paths + [workbook.name],
)

yield from self._gen_pages_workunit(workbook)
if len(paths) == 0:
yield from add_entity_to_container(
container_key=self._gen_workspace_key(workbook.workspaceId),
entity_type="dashboard",
entity_urn=dashboard_urn,
)

yield from self._gen_pages_workunit(workbook, paths)

def _gen_sigma_dataset_upstream_lineage_workunit(
self,
Expand Down
Loading
Loading