Skip to content

Commit

Permalink
feat(cli): Add run-id option to put sub-command (#11023)
Browse files Browse the repository at this point in the history
Adds an option to assign run-id to a given put command execution. 
This is useful when transformers do not exist for a given ingestion payload, we can follow up with custom metadata and assign it to an ingestion pipeline.
  • Loading branch information
pedro93 authored Jul 31, 2024
1 parent f73149a commit 1955c05
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.request_helper import make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand Down Expand Up @@ -153,10 +153,11 @@ def post_entity(
aspect_value: Dict,
cached_session_host: Optional[Tuple[Session, str]] = None,
is_async: Optional[str] = "false",
system_metadata: Union[None, SystemMetadataClass] = None,
) -> int:
endpoint: str = "/aspects/?action=ingestProposal"

proposal = {
proposal: Dict[str, Any] = {
"proposal": {
"entityType": entity_type,
"entityUrn": urn,
Expand All @@ -169,6 +170,12 @@ def post_entity(
},
"async": is_async,
}

if system_metadata is not None:
proposal["proposal"]["systemMetadata"] = json.dumps(
pre_json_transform(system_metadata.to_obj())
)

payload = json.dumps(proposal)
url = gms_host + endpoint
curl_command = make_curl_command(session, "POST", url, payload)
Expand Down
31 changes: 23 additions & 8 deletions metadata-ingestion/src/datahub/cli/put_cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
from typing import Optional
from typing import Optional, Union

import click
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import post_entity
from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp import MetadataChangeProposalWrapper, SystemMetadataClass
from datahub.ingestion.graph.client import get_default_graph
from datahub.metadata.schema_classes import (
DataPlatformInfoClass as DataPlatformInfo,
Expand Down Expand Up @@ -36,9 +36,15 @@ def put() -> None:
@click.option("--urn", required=True, type=str)
@click.option("-a", "--aspect", required=True, type=str)
@click.option("-d", "--aspect-data", required=True, type=str)
@click.option(
"--run-id",
type=str,
required=False,
help="Run ID into which we should log the aspect.",
)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def aspect(urn: str, aspect: str, aspect_data: str) -> None:
def aspect(urn: str, aspect: str, aspect_data: str, run_id: Optional[str]) -> None:
"""Update a single aspect of an entity"""

entity_type = guess_entity_type(urn)
Expand All @@ -48,6 +54,10 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None:

client = get_default_graph()

system_metadata: Union[None, SystemMetadataClass] = None
if run_id:
system_metadata = SystemMetadataClass(runId=run_id)

# TODO: Replace with client.emit, requires figuring out the correct subsclass of _Aspect to create from the data
status = post_entity(
client._session,
Expand All @@ -56,6 +66,7 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None:
aspect_name=aspect,
entity_type=entity_type,
aspect_value=aspect_obj,
system_metadata=system_metadata,
)
click.secho(f"Update succeeded with status {status}", fg="green")

Expand All @@ -82,8 +93,11 @@ def aspect(urn: str, aspect: str, aspect_data: str) -> None:
help="Logo URL that must be reachable from the DataHub UI.",
required=True,
)
@click.option(
"--run-id", type=str, help="Run ID into which we should log the platform."
)
def platform(
ctx: click.Context, name: str, display_name: Optional[str], logo: str
ctx: click.Context, name: str, display_name: Optional[str], logo: str, run_id: str
) -> None:
"""
Create or update a dataplatform entity in DataHub
Expand All @@ -104,11 +118,12 @@ def platform(
logoUrl=logo,
)
datahub_graph = get_default_graph()
datahub_graph.emit(
MetadataChangeProposalWrapper(
entityUrn=str(platform_urn), aspect=data_platform_info
)
mcp = MetadataChangeProposalWrapper(
entityUrn=str(platform_urn),
aspect=data_platform_info,
systemMetadata=SystemMetadataClass(runId=run_id),
)
datahub_graph.emit(mcp)
click.echo(
f"✅ Successfully wrote data platform metadata for {platform_urn} to DataHub ({datahub_graph})"
)

0 comments on commit 1955c05

Please sign in to comment.