Skip to content
This repository has been archived by the owner on Jan 27, 2025. It is now read-only.

Commit

Permalink
feat(ingest): add datahub state inspect command (datahub-project#6763)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-Dustin committed Feb 1, 2023
1 parent 6eaf3b1 commit 0d7b788
Show file tree
Hide file tree
Showing 19 changed files with 182 additions and 174 deletions.
19 changes: 11 additions & 8 deletions metadata-ingestion/src/datahub/cli/get_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, List, Optional

import click
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import get_aspects_for_entity
from datahub.telemetry import telemetry
Expand All @@ -11,23 +12,25 @@
logger = logging.getLogger(__name__)


@click.command(
name="get",
context_settings=dict(
ignore_unknown_options=False,
allow_extra_args=True,
),
)
@click.group(cls=DefaultGroup, default="urn")
def get() -> None:
"""A group of commands to get metadata from DataHub."""
pass


@get.command()
@click.option("--urn", required=False, type=str)
@click.option("-a", "--aspect", required=False, multiple=True, type=str)
@click.pass_context
@upgrade.check_upgrade
@telemetry.with_telemetry
def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
def urn(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
"""
Get metadata for an entity with an optional list of aspects to project.
This works for both versioned aspects and timeseries aspects. For timeseries aspects, it fetches the latest value.
"""
# We're using ctx.args here so that we can support `datahub get urn:li:...`
# in addition to the `--urn` variant.

if urn is None:
if not ctx.args:
Expand Down
83 changes: 83 additions & 0 deletions metadata-ingestion/src/datahub/cli/state_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
import logging
from datetime import datetime

import click
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import get_url_and_token
from datahub.ingestion.api.ingestion_job_state_provider import IngestionJobStateProvider
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider import (
DatahubIngestionCheckpointingProvider,
)
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass
from datahub.telemetry import telemetry
from datahub.upgrade import upgrade

logger = logging.getLogger(__name__)


@click.group(cls=DefaultGroup, default="urn")
def state() -> None:
"""Managed state stored in DataHub by stateful ingestion."""
pass


@state.command()
@click.option("--pipeline-name", required=True, type=str)
@click.option("--platform", required=True, type=str)
@click.option("--platform-instance", required=True, type=str)
@upgrade.check_upgrade
@telemetry.with_telemetry
def inspect(pipeline_name: str, platform: str, platform_instance: str) -> None:
"""
Get the latest stateful ingestion state for a given pipeline.
Only works for state entity removal for now.
"""

# Note that the platform-instance argument is not generated consistently,
# and is not always equal to the platform_instance config.

(url, token) = get_url_and_token()
datahub_graph = DataHubGraph(DataHubGraphConfig(server=url, token=token))

job_name = StaleEntityRemovalHandler.compute_job_id(platform)

data_job_urn = IngestionJobStateProvider.get_data_job_urn(
DatahubIngestionCheckpointingProvider.orchestrator_name,
pipeline_name,
job_name,
platform_instance,
)
raw_checkpoint = datahub_graph.get_latest_timeseries_value(
entity_urn=data_job_urn,
filter_criteria_map={
"pipelineName": pipeline_name,
"platformInstanceId": platform_instance,
},
aspect_type=DatahubIngestionCheckpointClass,
)

if not raw_checkpoint:
click.secho("No ingestion state found.", fg="red")
exit(1)

checkpoint = Checkpoint.create_from_checkpoint_aspect(
job_name=job_name,
checkpoint_aspect=raw_checkpoint,
state_class=GenericCheckpointState,
)
assert checkpoint

ts = datetime.utcfromtimestamp(raw_checkpoint.timestampMillis / 1000)
logger.info(
f"Found checkpoint with runId {checkpoint.run_id} and timestamp {ts.isoformat()}"
)

click.echo(json.dumps(checkpoint.state.urns, indent=2))
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datahub.cli.ingest_cli import ingest
from datahub.cli.migrate import migrate
from datahub.cli.put_cli import put
from datahub.cli.state_cli import state
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.telemetry import telemetry
Expand Down Expand Up @@ -149,6 +150,7 @@ def init() -> None:
datahub.add_command(delete)
datahub.add_command(get)
datahub.add_command(put)
datahub.add_command(state)
datahub.add_command(telemetry_cli)
datahub.add_command(migrate)
datahub.add_command(timeline)
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ def list_all_entity_urns(
def get_latest_timeseries_value(
self,
entity_urn: str,
aspect_name: str,
aspect_type: Type[Aspect],
filter_criteria_map: Dict[str, str],
) -> Optional[Aspect]:
Expand All @@ -285,7 +284,7 @@ def get_latest_timeseries_value(
query_body = {
"urn": entity_urn,
"entity": guess_entity_type(entity_urn),
"aspect": aspect_name,
"aspect": aspect_type.ASPECT_NAME,
"latestValue": True,
"filter": {"or": [{"and": filter_criteria}]},
}
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -102,7 +102,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext):
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=KafkaCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def create_from_checkpoint_aspect(
job_name: str,
checkpoint_aspect: Optional[DatahubIngestionCheckpointClass],
state_class: Type[StateType],
) -> Optional["Checkpoint"]:
) -> Optional["Checkpoint[StateType]"]:
if checkpoint_aspect is None:
return None
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState


class DbtCheckpointState(GenericCheckpointState):
"""
Class for representing the checkpoint state for DBT sources.
Stores all nodes and assertions being ingested and is used to remove any stale entities.
"""

_migration = pydantic_state_migrator(
{
"encoded_node_urns": "dataset",
"encoded_assertion_urns": "assertion",
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,51 @@
from datahub.utilities.urns.urn import guess_entity_type


def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
# mapping would be something like:
# {
# 'encoded_view_urns': 'dataset',
# 'encoded_container_urns': 'container',
# }

SUPPORTED_TYPES = [
"dataset",
"container",
"assertion",
"topic",
]
assert set(mapping.values()) <= set(SUPPORTED_TYPES)

def _validate_field_rename(cls: Type, values: dict) -> dict:
values.setdefault("urns", [])

for old_field, mapped_type in mapping.items():
if old_field not in values:
continue

value = values.pop(old_field)
if mapped_type == "dataset":
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_dataset(encoded_urn)
for encoded_urn in value
]
elif mapped_type == "topic":
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_topic(encoded_urn)
for encoded_urn in value
]
elif mapped_type == "container":
values["urns"] += [make_container_urn(guid) for guid in value]
elif mapped_type == "assertion":
values["urns"] += [make_assertion_urn(encoded) for encoded in value]
else:
raise ValueError(f"Unsupported type {mapped_type}")

return values

return pydantic.root_validator(pre=True, allow_reuse=True)(_validate_field_rename)


class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]):
urns: List[str] = pydantic.Field(default_factory=list)

Expand All @@ -20,6 +65,21 @@ class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointSt
# it isn't JSON serializable.
_urns_set: set = pydantic.PrivateAttr(default_factory=set)

_migration = pydantic_state_migrator(
{
# From SQL:
"encoded_table_urns": "dataset",
"encoded_view_urns": "dataset",
"encoded_container_urns": "container",
"encoded_assertion_urns": "assertion",
# From kafka:
"encoded_topic_urns": "topic",
# From dbt:
"encoded_node_urns": "dataset",
# "encoded_assertion_urns": "assertion", # already handled from SQL
}
)

def __init__(self, **data: Any): # type: ignore
super().__init__(**data)
self.urns = deduplicate_list(self.urns)
Expand Down Expand Up @@ -53,45 +113,3 @@ def get_percent_entities_changed(
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.urns, old_checkpoint_state.urns)]
)


def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
# mapping would be something like:
# {
# 'encoded_view_urns': 'dataset',
# 'encoded_container_urns': 'container',
# }

SUPPORTED_TYPES = [
"dataset",
"container",
"assertion",
"topic",
]
assert set(mapping.values()) <= set(SUPPORTED_TYPES)

def _validate_field_rename(cls: Type, values: dict) -> dict:
values.setdefault("urns", [])

for old_field, mapped_type in mapping.items():
if old_field not in values:
continue

value = values.pop(old_field)
if mapped_type == "dataset":
values["urns"] += CheckpointStateUtil.get_dataset_urns_not_in(value, [])
elif mapped_type == "topic":
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_topic(encoded_urn)
for encoded_urn in value
]
elif mapped_type == "container":
values["urns"] += [make_container_urn(guid) for guid in value]
elif mapped_type == "assertion":
values["urns"] += [make_assertion_urn(encoded) for encoded in value]
else:
raise ValueError(f"Unsupported type {mapped_type}")

return values

return pydantic.root_validator(pre=True, allow_reuse=True)(_validate_field_rename)
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState


class KafkaCheckpointState(GenericCheckpointState):
"""
This class represents the checkpoint state for Kafka based sources.
Stores all the topics being ingested and it is used to remove any stale entities.
"""

_migration = pydantic_state_migrator(
{
"encoded_topic_urns": "topic",
}
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState


class BaseSQLAlchemyCheckpointState(GenericCheckpointState):
Expand All @@ -10,12 +7,3 @@ class BaseSQLAlchemyCheckpointState(GenericCheckpointState):
Stores all tables and views being ingested and is used to remove any stale entities.
Subclasses can define additional state as appropriate.
"""

_migration = pydantic_state_migrator(
{
"encoded_table_urns": "dataset",
"encoded_view_urns": "dataset",
"encoded_container_urns": "container",
"encoded_assertion_urns": "assertion",
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def __init__(
self._job_id = self._init_job_id()
self.source.register_stateful_ingestion_usecase_handler(self)

def _init_job_id(self) -> JobId:
@classmethod
def compute_job_id(cls, platform: Optional[str]) -> JobId:
# Handle backward-compatibility for existing sources.
backward_comp_platform_to_job_name: Dict[str, str] = {
"bigquery": "ingest_from_bigquery_source",
Expand All @@ -173,14 +174,17 @@ def _init_job_id(self) -> JobId:
"pulsar": "ingest_from_pulsar_source",
"snowflake": "common_ingest_from_sql_source",
}
platform: Optional[str] = getattr(self.source, "platform")
if platform in backward_comp_platform_to_job_name:
return JobId(backward_comp_platform_to_job_name[platform])

# Default name for everything else
job_name_suffix = "stale_entity_removal"
return JobId(f"{platform}_{job_name_suffix}" if platform else job_name_suffix)

def _init_job_id(self) -> JobId:
platform: Optional[str] = getattr(self.source, "platform")
return self.compute_job_id(platform)

def _ignore_old_state(self) -> bool:
if (
self.stateful_ingestion_config is not None
Expand Down
Loading

0 comments on commit 0d7b788

Please sign in to comment.