Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): adds platform instance capability to glue connector #4130

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ def get_long_description():
"cryptography",
}

trino = {
"trino>=0.308",
"trino[sqlalchemy]>=0.308",
}

microsoft_common = {"msal==1.16.0"}

data_lake_base = {
Expand Down Expand Up @@ -184,8 +189,8 @@ def get_long_description():
"sqlalchemy": sql_common,
"superset": {"requests", "sqlalchemy", "great_expectations", "greenlet"},
"tableau": {"tableauserverclient>=0.17.0"},
"trino": sql_common | {"trino"},
"starburst-trino-usage": sql_common | {"trino"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | trino,
"nifi": {"requests", "packaging"},
"powerbi": {"orderedset"} | microsoft_common,
}
Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion/source_docs/glue.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ This plugin extracts the following:

| Capability | Status | Details |
| -----------| ------ | ---- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |
| Platform Instance | | [link](../../docs/platform-instances.md) |
| Data Containers | ✔️ | |
| Data Domains | ✔️ | [link](../../docs/domains.md) |

Expand Down Expand Up @@ -91,7 +91,9 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `underlying_platform` | | `glue` | @deprecated(Use `platform`) Override for platform name. Allowed values - `glue`, `athena` |
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.configuration.source_common import DEFAULT_ENV

if TYPE_CHECKING:

Expand Down
93 changes: 78 additions & 15 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
from pydantic import validator

from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import PlatformSourceConfigBase
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import make_dataset_urn, make_domain_urn
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
DatabaseKey,
Expand Down Expand Up @@ -39,6 +45,7 @@
DataJobInfoClass,
DataJobInputOutputClass,
DataJobSnapshotClass,
DataPlatformInstanceClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
MetadataChangeEventClass,
Expand All @@ -53,7 +60,11 @@
logger = logging.getLogger(__name__)


class GlueSourceConfig(AwsSourceConfig):
DEFAULT_PLATFORM = "glue"
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]


class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):

extract_owners: Optional[bool] = True
extract_transforms: Optional[bool] = True
Expand All @@ -79,6 +90,24 @@ def check_direction(cls, v: str) -> str:
)
return v.lower()

@validator("underlying_platform")
def underlying_platform_validator(cls, v: str) -> str:
if not v or v in VALID_PLATFORMS:
return v
else:
raise ConfigurationError(
f"'underlying_platform' can only take following values: {VALID_PLATFORMS}"
)

@validator("platform")
def platform_validator(cls, v: str) -> str:
if not v or v in VALID_PLATFORMS:
return v
else:
raise ConfigurationError(
f"'platform' can only take following values: {VALID_PLATFORMS}"
)


@dataclass
class GlueSourceReport(SourceReport):
Expand All @@ -104,18 +133,25 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
self.glue_client = config.glue_client
self.s3_client = config.s3_client
self.extract_transforms = config.extract_transforms
self.underlying_platform = config.underlying_platform
self.env = config.env

@classmethod
def create(cls, config_dict, ctx):
config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_underlying_platform(self):
if self.underlying_platform in ["athena"]:
return self.underlying_platform
return "glue"
@property
def platform(self) -> str:
"""
This deprecates "underlying_platform" field in favour of the standard "platform" one, which has
more priority when both are defined.
:return: platform, otherwise underlying_platform, otherwise "glue"
"""
return (
self.source_config.platform
or self.source_config.underlying_platform
or DEFAULT_PLATFORM
)

def get_all_jobs(self):
"""
Expand Down Expand Up @@ -239,7 +275,12 @@ def process_dataflow_node(
full_table_name = f"{node_args['database']}.{node_args['table_name']}"

# we know that the table will already be covered when ingesting Glue tables
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{full_table_name},{self.env})"
node_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=full_table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)

# if data object is S3 bucket
elif node_args.get("connection_type") == "s3":
Expand Down Expand Up @@ -533,8 +574,11 @@ def get_lineage_if_enabled(
def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
platform=self.get_underlying_platform(),
instance=self.env,
platform=self.platform,
instance=self.source_config.platform_instance
# keeps backward compatibility when platform instance is missed
if self.source_config.platform_instance is not None
else self.source_config.env,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -608,8 +652,11 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_workunit(workunit)
yield workunit

dataset_urn: str = make_dataset_urn(
self.get_underlying_platform(), full_table_name, self.env
dataset_urn: str = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=full_table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)
yield from self._get_domain_wu(
dataset_name=full_table_name,
Expand All @@ -636,7 +683,7 @@ def _transform_extraction(self) -> Iterable[MetadataWorkUnit]:
for job in self.get_all_jobs():

flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
self.platform, job["Name"], self.env
)

flow_wu = self.get_dataflow_wu(flow_urn, job)
Expand Down Expand Up @@ -742,13 +789,28 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
schemaName=table_name,
version=0,
fields=fields,
platform=f"urn:li:dataPlatform:{self.get_underlying_platform()}",
platform=f"urn:li:dataPlatform:{self.platform}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

def get_data_platform_instance() -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.source_config.platform_instance
)
if self.source_config.platform_instance
else None,
)

dataset_snapshot = DatasetSnapshot(
urn=make_dataset_urn(self.get_underlying_platform(), table_name, self.env),
urn=make_dataset_urn_with_platform_instance(
platform=self.platform,
name=table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
),
aspects=[],
)

Expand All @@ -761,6 +823,7 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:

dataset_snapshot.aspects.append(get_dataset_properties())
dataset_snapshot.aspects.append(get_schema_metadata(self))
dataset_snapshot.aspects.append(get_data_platform_instance())

metadata_record = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
return metadata_record
Expand Down
Loading