Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): adds platform instance capability to glue connector #4130

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ def get_long_description():
"cryptography",
}

trino = {
"trino>=0.308",
"trino[sqlalchemy]>=0.308"
}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
Expand Down Expand Up @@ -148,8 +153,8 @@ def get_long_description():
"sqlalchemy": sql_common,
"superset": {"requests", "sqlalchemy", "great_expectations"},
"tableau": {"tableauserverclient>=0.17.0"},
"trino": sql_common | {"trino"},
"starburst-trino-usage": sql_common | {"trino"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | trino,
"nifi": {"requests", "packaging"},
}

Expand Down
44 changes: 23 additions & 21 deletions metadata-ingestion/source_docs/glue.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ This plugin extracts the following:

| Capability | Status | Details |
| -----------| ------ | ---- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |
| Platform Instance | | [link](../../docs/platform-instances.md) |

## Quickstart recipe

Expand Down Expand Up @@ -73,26 +73,28 @@ plus `s3:GetObject` for the job script locations.

Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
| ------------------------------- | -------- | ------------ | ---------------------------------------------------------------------------------- |
| `aws_region` | ✅ | | AWS region code. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. |
| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
| `extract_owners` | | `True` | When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets. |
| Field | Required | Default | Description |
|---------------------------------| -------- | ------------ |------------------------------------------------------------------------------------------------------------------------------------------|
| `aws_region` | ✅ | | AWS region code. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. |
| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `underlying_platform` | | `glue` | @deprecated(Use `platform`) Override for platform name. Allowed values - `glue`, `athena` |
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
| `extract_owners` | | `True` | When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets. |

## Compatibility

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.configuration.source_common import DEFAULT_ENV

if TYPE_CHECKING:

Expand Down
64 changes: 54 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import validator

from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import PlatformSourceConfigBase
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -45,8 +46,11 @@

logger = logging.getLogger(__name__)

VALID_PLATFORMS = ["glue", "athena"]
DEFAULT_PLATFORM = "glue"
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved

class GlueSourceConfig(AwsSourceConfig):

class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):

extract_owners: Optional[bool] = True
extract_transforms: Optional[bool] = True
Expand All @@ -71,6 +75,29 @@ def check_direction(cls, v: str) -> str:
)
return v.lower()

@validator("underlying_platform")
def underlying_platform_validator(cls, v: str) -> str:
if not v or v in VALID_PLATFORMS:
return v
else:
raise ConfigurationError(
f"'underlying_platform' can only take following values: {VALID_PLATFORMS}"
)

@validator("platform")
def platform_validator(cls, v: str) -> str:
if not v or v in VALID_PLATFORMS:
return v
else:
raise ConfigurationError(
f"'platform' can only take following values: {VALID_PLATFORMS}"
)

@validator("platform_instance")
def platform_instance_validator(cls, v):
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
# TODO: is there any restriction on platform_instance values?
return v


@dataclass
class GlueSourceReport(SourceReport):
Expand All @@ -96,18 +123,25 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
self.glue_client = config.glue_client
self.s3_client = config.s3_client
self.extract_transforms = config.extract_transforms
self.underlying_platform = config.underlying_platform
self.env = config.env

@classmethod
def create(cls, config_dict, ctx):
config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_underlying_platform(self):
if self.underlying_platform in ["athena"]:
return self.underlying_platform
return "glue"
@property
def platform(self) -> str:
"""
This deprecates "underlying_platform" field in favour of the standard "platform" one, which has
more priority when both are defined.
:return: platform, otherwise underlying_platform, otherwise "glue"
"""
return (
self.source_config.platform
or self.source_config.underlying_platform
or DEFAULT_PLATFORM
)

def get_all_jobs(self):
"""
Expand Down Expand Up @@ -231,7 +265,12 @@ def process_dataflow_node(
full_table_name = f"{node_args['database']}.{node_args['table_name']}"

# we know that the table will already be covered when ingesting Glue tables
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{full_table_name},{self.env})"
node_urn = mce_builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=full_table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)

# if data object is S3 bucket
elif node_args.get("connection_type") == "s3":
Expand Down Expand Up @@ -558,7 +597,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
for job in self.get_all_jobs():

flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
self.platform, job["Name"], self.env
)

flow_wu = self.get_dataflow_wu(flow_urn, job)
Expand Down Expand Up @@ -670,13 +709,18 @@ def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
schemaName=table_name,
version=0,
fields=fields,
platform=f"urn:li:dataPlatform:{self.get_underlying_platform()}",
platform=f"urn:li:dataPlatform:{self.platform}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{table_name},{self.env})",
urn=mce_builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
),
aspects=[],
)

Expand Down
Loading