Skip to content

Commit

Permalink
feat(ingestion): cli - Add the ability to query the latest timeseries…
Browse files Browse the repository at this point in the history
… aspect value via the get command. (datahub-project#4395)
  • Loading branch information
rslanka authored and maggiehays committed Aug 1, 2022
1 parent 1ab8c20 commit 33c66cb
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 7 deletions.
6 changes: 3 additions & 3 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Commands:
check Helper commands for checking various aspects of DataHub.
delete Delete metadata from datahub using a single urn or a combination of filters
docker Helper commands for setting up and interacting with a local DataHub instance using Docker.
get Get metadata for an entity with an optional list of aspects to project
get Get metadata for an entity with an optional list of aspects to project.
ingest Ingest metadata into DataHub.
init Configure which datahub instance to connect to
put Update a single aspect of an entity
Expand Down Expand Up @@ -199,7 +199,7 @@ datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset

### get

The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API.
The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API. This works for both versioned aspects and timeseries aspects. For timeseries aspects, it fetches the latest value.
For example the following command gets the ownership aspect from the dataset `urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)`

```console
Expand Down Expand Up @@ -338,4 +338,4 @@ datahub timeline --urn "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccou
MODIFY TAG dataset:mysql:User.UserAccount : A change in aspect editableSchemaMetadata happened at time 2022-02-17 20:03:42.0
2022-02-17 14:17:30 - 0.0.0-computed
MODIFY TAG dataset:mysql:User.UserAccount : A change in aspect editableSchemaMetadata happened at time 2022-02-17 20:17:30.118
```
```
91 changes: 88 additions & 3 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@
from datahub.emitter.rest_emitter import _make_curl_command
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import (
AssertionRunEventClass,
BrowsePathsClass,
ChartInfoClass,
ChartKeyClass,
DatahubIngestionCheckpointClass,
DatahubIngestionRunSummaryClass,
DataJobInputOutputClass,
DataJobKeyClass,
DataPlatformInstanceClass,
DataProcessInfoClass,
DatasetDeprecationClass,
DatasetKeyClass,
DatasetProfileClass,
DatasetPropertiesClass,
DatasetUpstreamLineageClass,
DatasetUsageStatisticsClass,
EditableDatasetPropertiesClass,
EditableSchemaMetadataClass,
GlobalTagsClass,
Expand All @@ -39,6 +44,7 @@
MLFeaturePropertiesClass,
MLPrimaryKeyKeyClass,
MLPrimaryKeyPropertiesClass,
OperationClass,
OwnershipClass,
SchemaMetadataClass,
StatusClass,
Expand Down Expand Up @@ -470,30 +476,109 @@ def post_entity(
ChartKeyClass: "chartKey",
}

timeseries_class_to_aspect_name_map: Dict[Type, str] = {
DatahubIngestionCheckpointClass: "datahubIngestionCheckpoint",
DatahubIngestionRunSummaryClass: "datahubIngestionRunSummary",
DatasetUsageStatisticsClass: "datasetUsageStatistics",
DatasetProfileClass: "datasetProfile",
AssertionRunEventClass: "assertionRunEvent",
OperationClass: "operation",
}


def _get_pydantic_class_from_aspect_name(aspect_name: str) -> Optional[Type[Aspect]]:
candidates = [k for (k, v) in type_class_to_name_map.items() if v == aspect_name]
return candidates[0] or None
candidates.extend(
[
k
for (k, v) in timeseries_class_to_aspect_name_map.items()
if v == aspect_name
]
)
return candidates[0] if candidates else None


def _get_aspect_name_from_aspect_class(aspect_class: str) -> str:
class_to_name_map = {
k.RECORD_SCHEMA.fullname.replace("pegasus2avro.", ""): v # type: ignore
for (k, v) in type_class_to_name_map.items()
for (k, v) in (
set(type_class_to_name_map.items())
| set(timeseries_class_to_aspect_name_map.items())
)
}
return class_to_name_map.get(aspect_class, "unknown")


def get_latest_timeseries_aspect_values(
entity_urn: str,
timeseries_aspect_name: str,
cached_session_host: Optional[Tuple[Session, str]],
) -> Dict:
if not cached_session_host:
session, gms_host = get_session_and_host()
else:
session, gms_host = cached_session_host
query_body = {
"urn": entity_urn,
"entity": guess_entity_type(entity_urn),
"aspect": timeseries_aspect_name,
"latestValue": True,
}
end_point = "/aspects?action=getTimeseriesAspectValues"
try:
response = session.post(url=gms_host + end_point, data=json.dumps(query_body))
response.raise_for_status()
return response.json()
except Exception:
# Ignore exceptions
return {}


def get_aspects_for_entity(
entity_urn: str,
aspects: List[str],
typed: bool = False,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> Dict[str, Union[dict, DictWrapper]]:
entity_response = get_entity(entity_urn, aspects, cached_session_host)
# Process non-timeseries aspects
non_timeseries_aspects: List[str] = [
a for a in aspects if a not in timeseries_class_to_aspect_name_map.values()
]
entity_response = get_entity(
entity_urn, non_timeseries_aspects, cached_session_host
)
aspect_list: List[Dict[str, dict]] = list(entity_response["value"].values())[0][
"aspects"
]

# Process timeseries aspects & append to aspect_list
timeseries_aspects: List[str] = [
a for a in aspects if a in timeseries_class_to_aspect_name_map.values()
]
for timeseries_aspect in timeseries_aspects:
timeseries_response = get_latest_timeseries_aspect_values(
entity_urn, timeseries_aspect, cached_session_host
)
values: List[Dict] = timeseries_response.get("value", {}).get("values", [])
if values:
aspect_cls: Optional[Type] = _get_pydantic_class_from_aspect_name(
timeseries_aspect
)
if aspect_cls is not None:
aspect_value = values[0]
# Decode the json-encoded generic aspect value.
aspect_value["aspect"]["value"] = json.loads(
aspect_value["aspect"]["value"]
)
aspect_list.append(
# Follow the convention used for non-timeseries aspects.
{
aspect_cls.RECORD_SCHEMA.fullname.replace(
"pegasus2avro.", ""
): aspect_value
}
)

aspect_map: Dict[str, Union[dict, DictWrapper]] = {}
for a in aspect_list:
aspect_class = list(a.keys())[0]
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub/cli/get_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
@click.pass_context
@telemetry.with_telemetry
def get(ctx: Any, urn: Optional[str], aspect: List[str]) -> None:
"""Get metadata for an entity with an optional list of aspects to project"""
"""
Get metadata for an entity with an optional list of aspects to project.
This works for both versioned aspects and timeseries aspects. For timeseries aspects, it fetches the latest value.
"""

if urn is None:
if not ctx.args:
Expand Down

0 comments on commit 33c66cb

Please sign in to comment.