Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 31, 2025
2 parents fad4936 + e2b1ed7 commit 796cd79
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 26 deletions.
179 changes: 179 additions & 0 deletions docs/actions/events/entity-change-event.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,182 @@ This event is emitted when a new entity has been hard-deleted on DataHub.
}
}
```

## Action Request Events (Proposals)

Action Request events represent proposals for changes to entities that may require approval before being applied. These events have entityType "actionRequest" and use the `LIFECYCLE` category with `CREATE` operation.

### Domain Association Request Event

This event is emitted when a domain association is proposed for an entity on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:abc-123",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"domains": "[\"urn:li:domain:marketing\"]",
"actionRequestType": "DOMAIN_ASSOCIATION",
"resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)",
"resourceType": "dataset"
}
}
```

### Owner Association Request Event

This event is emitted when an owner association is proposed for an entity on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:def-456",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"owners": "[{\"type\":\"TECHNICAL_OWNER\",\"typeUrn\":\"urn:li:ownershipType:technical_owner\",\"ownerUrn\":\"urn:li:corpuser:jdoe\"}]",
"actionRequestType": "OWNER_ASSOCIATION",
"resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)",
"resourceType": "dataset"
}
}
```

### Tag Association Request Event

This event is emitted when a tag association is proposed for an entity on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:ghi-789",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"actionRequestType": "TAG_ASSOCIATION",
"resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)",
"tagUrn": "urn:li:tag:pii",
"resourceType": "dataset"
}
}
```

### Create Glossary Term Request Event

This event is emitted when a new glossary term creation is proposed on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:jkl-101",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"parentNodeUrn": "urn:li:glossaryNode:123",
"glossaryEntityName": "ExampleTerm",
"actionRequestType": "CREATE_GLOSSARY_TERM",
"resourceType": "glossaryTerm"
}
}
```

### Term Association Request Event

This event is emitted when a glossary term association is proposed for an entity on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:mno-102",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"glossaryTermUrn": "urn:li:glossaryTerm:123",
"actionRequestType": "TERM_ASSOCIATION",
"resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)",
"resourceType": "dataset"
}
}
```

### Update Description Request Event

This event is emitted when an update to an entity's description is proposed on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:pqr-103",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"description": "Example description for a dataset.",
"actionRequestType": "UPDATE_DESCRIPTION",
"resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)",
"resourceType": "dataset"
}
}
```

### Structured Property Association Request Event

This event is emitted when a structured property association is proposed for an entity on DataHub.

#### Sample Event
```json
{
"entityType": "actionRequest",
"entityUrn": "urn:li:actionRequest:stu-104",
"category": "LIFECYCLE",
"operation": "CREATE",
"auditStamp": {
"actor": "urn:li:corpuser:jdoe",
"time": 1234567890
},
"version": 0,
"parameters": {
"structuredProperties": "[{\"propertyUrn\":\"urn:li:structuredProperty:123\",\"values\":[\"value1\",\"value2\"]}]",
"actionRequestType": "STRUCTURED_PROPERTY_ASSOCIATION",
"resourceUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,example.table,PROD)",
"resourceType": "dataset"
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ In order to update the executor, ie. to deploy a new container version, you'll n

### Deploying on Kubernetes

The Helm chart [datahub-executor-worker](https://github.com/acryldata/datahub-executor-helm/tree/main/charts/datahub-executor-worker) can be used to deploy on a Kubernetes cluster. These instructions also apply for deploying to Amazon Elastic Kubernetes Service (EKS) or Google Kubernetes Engine (GKE).
The Helm chart [datahub-executor-worker](https://executor-helm.acryl.io/index.yaml) can be used to deploy on a Kubernetes cluster. These instructions also apply for deploying to Amazon Elastic Kubernetes Service (EKS) or Google Kubernetes Engine (GKE).

1. **Download Chart**: Download the [latest release](https://github.com/acryldata/datahub-executor-helm/releases) of the chart
1. **Download Chart**: Download the [latest release](https://executor-helm.acryl.io/index.yaml) of the chart
2. **Unpack the release archive**:
```
tar zxvf v0.0.4.tar.gz --strip-components=2
Expand Down
134 changes: 134 additions & 0 deletions metadata-ingestion/examples/library/create_ermodelrelationship.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import time

from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
AuditStampClass,
ERModelRelationshipCardinalityClass,
ERModelRelationshipKeyClass,
ERModelRelationshipPropertiesClass,
NumberTypeClass,
OtherSchemaClass,
RelationshipFieldMappingClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StringTypeClass,
)

# Configuration
GMS_ENDPOINT = "http://localhost:8080"
PLATFORM = "mysql"
ENV = "PROD"

e = DatahubRestEmitter(gms_server=GMS_ENDPOINT, extra_headers={})


def get_schema_field(
name: str, dtype: str, type: SchemaFieldDataTypeClass
) -> SchemaFieldClass:
"""Creates a schema field for MySQL columns."""

field = SchemaFieldClass(
fieldPath=name,
type=type,
nativeDataType=dtype,
description=name,
lastModified=AuditStampClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:ingestion",
),
)
if name == "id":
field.isPartitioningKey = True
return field


# Define Employee Table
dataset_employee = make_dataset_urn(PLATFORM, "Employee", ENV)
employee_fields = [
get_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
get_schema_field(
"name", "varchar", SchemaFieldDataTypeClass(type=StringTypeClass())
),
get_schema_field("age", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
get_schema_field(
"company_id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())
),
]

e.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=dataset_employee,
aspect=SchemaMetadataClass(
schemaName="Employee",
platform=make_data_platform_urn(PLATFORM),
fields=employee_fields,
version=0,
hash="",
platformSchema=OtherSchemaClass(rawSchema=""),
),
)
)

# Define Company Table
dataset_company = make_dataset_urn(PLATFORM, "Company", ENV)
company_fields = [
get_schema_field("id", "int", SchemaFieldDataTypeClass(type=NumberTypeClass())),
get_schema_field(
"name", "varchar", SchemaFieldDataTypeClass(type=StringTypeClass())
),
]

e.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=dataset_company,
aspect=SchemaMetadataClass(
schemaName="Company",
platform=make_data_platform_urn(PLATFORM),
fields=company_fields,
version=0,
hash="",
platformSchema=OtherSchemaClass(rawSchema=""),
),
)
)

# Establish Relationship (Foreign Key: Employee.company_id → Company.id)
relationship_key = ERModelRelationshipKeyClass(id="employee_to_company")
relationship_properties = ERModelRelationshipPropertiesClass(
name="Employee to Company Relationship",
source=dataset_employee,
destination=dataset_company,
relationshipFieldMappings=[
RelationshipFieldMappingClass(sourceField="company_id", destinationField="id")
],
cardinality=ERModelRelationshipCardinalityClass.N_ONE,
customProperties={"constraint": "Foreign Key", "index": "company_id"},
)

relationship_urn = f"urn:li:erModelRelationship:{relationship_key.id}"

e.emit_mcp(
MetadataChangeProposalWrapper(
entityType="erModelRelationship",
changeType="UPSERT",
entityKeyAspect=relationship_key,
aspectName=relationship_key.ASPECT_NAME,
aspect=relationship_key,
)
)

e.emit_mcp(
MetadataChangeProposalWrapper(
entityUrn=relationship_urn,
entityType="erModelRelationship",
changeType="UPSERT",
aspectName=relationship_properties.ASPECT_NAME,
aspect=relationship_properties,
)
)

print("relationship_urn", relationship_urn)
print("Employee and Company tables created with ERModelRelationship linking them.")
6 changes: 4 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@
data_lake_profiling = {
"pydeequ>=1.1.0",
"pyspark~=3.5.0",
# cachetools is used by the profiling config
*cachetools_lib,
}

delta_lake = {
Expand Down Expand Up @@ -485,9 +487,9 @@
| classification_lib
| {"db-dtypes"} # Pandas extension data types
| cachetools_lib,
"s3": {*s3_base, *data_lake_profiling, *cachetools_lib},
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"abs": {*abs_base, *data_lake_profiling, *cachetools_lib},
"abs": {*abs_base, *data_lake_profiling},
"sagemaker": aws_common,
"salesforce": {"simple-salesforce", *cachetools_lib},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
Expand Down
6 changes: 2 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
ContainerKey,
Expand Down Expand Up @@ -155,10 +156,7 @@ class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase):
workspace: str = Field(
description="The Mode workspace name. Find it in Settings > Workspace > Details."
)
default_schema: str = Field(
default="public",
description="Default schema to use when schema is not provided in an SQL query",
)
_default_schema = pydantic_removed_field("default_schema")

space_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.systemmetadata.TraceStorageStatus;
import com.linkedin.metadata.systemmetadata.TraceWriteStatus;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.openapi.v1.models.TraceStorageStatus;
import io.datahubproject.openapi.v1.models.TraceWriteStatus;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand Down
Loading

0 comments on commit 796cd79

Please sign in to comment.