Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): adds platform instance capability to glue connector #4130

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ def get_long_description():
"cryptography",
}

trino = {
"trino>=0.308",
"trino[sqlalchemy]>=0.308"
}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
Expand Down Expand Up @@ -159,8 +164,8 @@ def get_long_description():
"sqlalchemy": sql_common,
"superset": {"requests", "sqlalchemy", "great_expectations", "greenlet"},
"tableau": {"tableauserverclient>=0.17.0"},
"trino": sql_common | {"trino"},
"starburst-trino-usage": sql_common | {"trino"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | trino,
"nifi": {"requests", "packaging"},
}

Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion/source_docs/glue.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ This plugin extracts the following:

| Capability | Status | Details |
| -----------| ------ | ---- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |
| Platform Instance | | [link](../../docs/platform-instances.md) |
| Data Containers | ✔️ | |
| Data Domains | ✔️ | [link](../../docs/domains.md) |

Expand Down Expand Up @@ -91,7 +91,9 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `underlying_platform` | | `glue` | @deprecated(Use `platform`) Override for platform name. Allowed values - `glue`, `athena` |
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
Expand Down
40 changes: 24 additions & 16 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import json
from typing import Any, Iterable, List, Optional, TypeVar, Union

from datahub.emitter.mce_builder import make_container_urn, make_data_platform_urn
from datahub.emitter.mce_builder import (
make_container_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
Expand Down Expand Up @@ -90,26 +93,31 @@ def gen_containers(
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="containerProperties",
aspect=ContainerProperties(
name=name, customProperties=dataclasses.asdict(container_key)
name=name,
customProperties=dataclasses.asdict(
container_key,
dict_factory=lambda x: {k: v for (k, v) in x if v is not None},
),
),
)
wu = MetadataWorkUnit(id=f"container-info-{name}-{container_urn}", mcp=mcp)
yield wu

mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
aspectName="dataPlatformInstance",
aspect=DataPlatformInstance(
platform=f"{make_data_platform_urn(container_key.platform)}"
),
)
wu = MetadataWorkUnit(
id=f"container-platforminstance-{name}-{container_urn}", mcp=mcp
)
yield wu
if container_key.instance:
mcp = MetadataChangeProposalWrapper(
entityType="container",
changeType=ChangeTypeClass.UPSERT,
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete?

aspectName="dataPlatformInstance",
aspect=DataPlatformInstance(
platform=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be changed to:

aspect=DataPlatformInstance(
  platform=f"{make_data_platform_urn(...)}",  # as before
  instance=f"{make_data_platform_instance_urn(container_key..."
...

),
)
wu = MetadataWorkUnit(
id=f"container-platforminstance-{name}-{container_urn}", mcp=mcp
)
yield wu

# Set subtype
subtype_mcp = MetadataChangeProposalWrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.configuration.source_common import DEFAULT_ENV

if TYPE_CHECKING:

Expand Down
76 changes: 61 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
from pydantic import validator

from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import PlatformSourceConfigBase
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import make_dataset_urn, make_domain_urn
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
make_domain_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
DatabaseKey,
Expand Down Expand Up @@ -53,7 +57,11 @@
logger = logging.getLogger(__name__)


class GlueSourceConfig(AwsSourceConfig):
DEFAULT_PLATFORM = "glue"
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]


class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):

extract_owners: Optional[bool] = True
extract_transforms: Optional[bool] = True
Expand All @@ -79,6 +87,24 @@ def check_direction(cls, v: str) -> str:
)
return v.lower()

@validator("underlying_platform")
def underlying_platform_validator(cls, v: str) -> str:
if not v or v in VALID_PLATFORMS:
return v
else:
raise ConfigurationError(
f"'underlying_platform' can only take following values: {VALID_PLATFORMS}"
)

@validator("platform")
def platform_validator(cls, v: str) -> str:
if not v or v in VALID_PLATFORMS:
return v
else:
raise ConfigurationError(
f"'platform' can only take following values: {VALID_PLATFORMS}"
)


@dataclass
class GlueSourceReport(SourceReport):
Expand All @@ -104,18 +130,25 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
self.glue_client = config.glue_client
self.s3_client = config.s3_client
self.extract_transforms = config.extract_transforms
self.underlying_platform = config.underlying_platform
self.env = config.env

@classmethod
def create(cls, config_dict, ctx):
config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_underlying_platform(self):
if self.underlying_platform in ["athena"]:
return self.underlying_platform
return "glue"
@property
def platform(self) -> str:
"""
This deprecates "underlying_platform" field in favour of the standard "platform" one, which has
more priority when both are defined.
:return: platform, otherwise underlying_platform, otherwise "glue"
"""
return (
self.source_config.platform
or self.source_config.underlying_platform
or DEFAULT_PLATFORM
)

def get_all_jobs(self):
"""
Expand Down Expand Up @@ -239,7 +272,12 @@ def process_dataflow_node(
full_table_name = f"{node_args['database']}.{node_args['table_name']}"

# we know that the table will already be covered when ingesting Glue tables
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{full_table_name},{self.env})"
node_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=full_table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)

# if data object is S3 bucket
elif node_args.get("connection_type") == "s3":
Expand Down Expand Up @@ -534,8 +572,8 @@ def get_lineage_if_enabled(
def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
platform=self.get_underlying_platform(),
instance=self.env,
platform=self.platform,
instance=self.source_config.platform_instance,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -609,8 +647,11 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_workunit(workunit)
yield workunit

dataset_urn: str = make_dataset_urn(
self.get_underlying_platform(), full_table_name, self.env
dataset_urn: str = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=full_table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)
yield from self._get_domain_wu(
dataset_name=full_table_name,
Expand All @@ -637,7 +678,7 @@ def _transform_extraction(self) -> Iterable[MetadataWorkUnit]:
for job in self.get_all_jobs():

flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
self.platform, job["Name"], self.env
)

flow_wu = self.get_dataflow_wu(flow_urn, job)
Expand Down Expand Up @@ -743,13 +784,18 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
schemaName=table_name,
version=0,
fields=fields,
platform=f"urn:li:dataPlatform:{self.get_underlying_platform()}",
platform=f"urn:li:dataPlatform:{self.platform}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

dataset_snapshot = DatasetSnapshot(
urn=make_dataset_urn(self.get_underlying_platform(), table_name, self.env),
urn=make_dataset_urn_with_platform_instance(
platform=self.platform,
name=table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
),
aspects=[],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def gen_database_containers(

def gen_schema_key(self, db_name: str, schema: str) -> DatabaseKey:
return DatabaseKey(
platform=self.platform, instance=self.config.env, database=schema
platform=self.platform,
instance=self.config.platform_instance,
database=schema,
)

def gen_schema_containers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,14 @@ def gen_schema_key(self, db_name: str, schema: str) -> PlatformKey:
database=db_name,
schema=schema,
platform=self.platform,
instance=self.config.env,
instance=self.config.platform_instance,
treff7es marked this conversation as resolved.
Show resolved Hide resolved
)

def gen_database_key(self, database: str) -> PlatformKey:
return DatabaseKey(
database=database,
platform=self.platform,
instance=self.config.env,
instance=self.config.platform_instance,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading