Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): type stubs for boto3 #2975

Merged
merged 56 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
4753f74
Init endpoint models
kevinhu Jul 15, 2021
3c8e7e4
Init lineage extractor
kevinhu Jul 15, 2021
ef2f664
Refactor model endpoint extraction
kevinhu Jul 15, 2021
f2e1d50
Add model groups
kevinhu Jul 15, 2021
057a455
Add model group lineage extraction
kevinhu Jul 15, 2021
393f8ec
Init endpoint stubs
kevinhu Jul 15, 2021
a6e2173
Successful endpoint ingestion
kevinhu Jul 15, 2021
821994a
Endpoint lineage stubs
kevinhu Jul 15, 2021
4612045
Add endpoint field to model schema
kevinhu Jul 16, 2021
18a9001
Stubs for model group lineage
kevinhu Jul 16, 2021
4eda319
Ingest model-group lineage
kevinhu Jul 16, 2021
77640af
Formatting
kevinhu Jul 16, 2021
4ad6638
More comments
kevinhu Jul 16, 2021
0220e7f
Refactor status types
kevinhu Jul 16, 2021
11546c2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 16, 2021
61db239
Merge
kevinhu Jul 16, 2021
160edab
Fix context typo
kevinhu Jul 16, 2021
0ae0ed8
Init model group entity
kevinhu Jul 16, 2021
698940c
Ingest model groups
kevinhu Jul 16, 2021
d8931db
Reorder models and groups
kevinhu Jul 16, 2021
1fc4461
Successful model group lineage ingestion
kevinhu Jul 16, 2021
7c3628f
Ingest dataowners
kevinhu Jul 16, 2021
7f1bf48
Sort
kevinhu Jul 16, 2021
4875083
Ingest model group description
kevinhu Jul 16, 2021
581dfd9
Init hyperparams and metrics aspects
kevinhu Jul 16, 2021
471f8c5
External links for Glue jobs
kevinhu Jul 17, 2021
4469e37
SageMaker job URLs
kevinhu Jul 17, 2021
4cf997b
Add external URLs to models
kevinhu Jul 17, 2021
2a36fb6
Ingest model URLs
kevinhu Jul 17, 2021
444525e
Hyperparam ingestion
kevinhu Jul 17, 2021
78daddc
PR updates
kevinhu Jul 19, 2021
a6aaef5
Rename endpoint -> deployment
kevinhu Jul 19, 2021
c44bff2
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-mo…
kevinhu Jul 19, 2021
9a72890
Metrics ingestion
kevinhu Jul 19, 2021
5713172
Merge
kevinhu Jul 19, 2021
02b1289
Merge
kevinhu Jul 19, 2021
063e247
Add external URLs for endpoints
kevinhu Jul 19, 2021
cd43b58
Ingest endpoint URLs
kevinhu Jul 19, 2021
54b1496
Remove unnecessary aspects
kevinhu Jul 19, 2021
dea8883
Set hyperparameter type to string
kevinhu Jul 19, 2021
c7e85fb
Strip quotes
kevinhu Jul 19, 2021
fbec85d
Merge branch 'sagemaker-model-metrics' of github.com:kevinhu/datahub …
kevinhu Jul 20, 2021
d937be0
Working types
kevinhu Jul 20, 2021
f963701
Use proper types for models
kevinhu Jul 20, 2021
16bcf44
Use proper types for sagemaker lineage
kevinhu Jul 20, 2021
f185c34
Use proper types for sagemaker feature groups
kevinhu Jul 20, 2021
984f463
Stubs for jobs
kevinhu Jul 21, 2021
0d80da4
Merge branch 'linkedin:master' into boto-stubs
kevinhu Jul 21, 2021
67241ca
Merge branch 'master' of github.com:kevinhu/datahub into boto-stubs
kevinhu Jul 28, 2021
5fb8b54
Merge
kevinhu Jul 28, 2021
84f2b47
Fix browse paths
kevinhu Jul 28, 2021
f11c624
Remove unused
kevinhu Jul 28, 2021
58a1682
Fix literal type import
kevinhu Jul 28, 2021
dab1b0d
Remove print
kevinhu Jul 28, 2021
ab9801a
Fix job class typing
kevinhu Jul 28, 2021
5f76968
Fix final type import
kevinhu Jul 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def get_long_description():
"types-cachetools",
# versions 0.1.13 and 0.1.14 seem to have issues
"types-click==0.1.12",
"boto3-stubs[s3,glue,sagemaker]",
}

base_dev_requirements = {
Expand Down
26 changes: 18 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from typing import List, Optional, Union

import boto3
from boto3.session import Session
from mypy_boto3_glue import GlueClient
from mypy_boto3_s3 import S3Client
from mypy_boto3_sagemaker import SageMakerClient

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
Expand Down Expand Up @@ -46,22 +50,20 @@ class AwsSourceConfig(ConfigModel):
aws_role: Optional[Union[str, List[str]]] = None
aws_region: str

def get_client(self, service: str) -> boto3.client:
def get_session(self) -> Session:
if (
self.aws_access_key_id
and self.aws_secret_access_key
and self.aws_session_token
):
return boto3.client(
service,
return Session(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.aws_region,
)
elif self.aws_access_key_id and self.aws_secret_access_key:
return boto3.client(
service,
return Session(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region,
Expand All @@ -77,15 +79,23 @@ def get_client(self, service: str) -> boto3.client:
self.aws_role,
{},
)
return boto3.client(
service,
return Session(
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
region_name=self.aws_region,
)
else:
return boto3.client(service, region_name=self.aws_region)
return Session(region_name=self.aws_region)

def get_s3_client(self) -> S3Client:
return self.get_session().client("s3")

def get_glue_client(self) -> GlueClient:
return self.get_session().client("glue")

def get_sagemaker_client(self) -> SageMakerClient:
return self.get_session().client("sagemaker")


def make_s3_urn(s3_uri: str, env: str, suffix: Optional[str] = None) -> str:
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ class GlueSourceConfig(AwsSourceConfig):

@property
def glue_client(self):
return self.get_client("glue")
return self.get_glue_client()

@property
def s3_client(self):
return self.get_client("s3")
return self.get_s3_client()


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class SagemakerSourceConfig(AwsSourceConfig):

@property
def sagemaker_client(self):
return self.get_client("sagemaker")
return self.get_sagemaker_client()


@dataclass
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List
from typing import Iterable, List

from mypy_boto3_sagemaker import SageMakerClient
from mypy_boto3_sagemaker.type_defs import (
DescribeFeatureGroupResponseTypeDef,
FeatureDefinitionTypeDef,
FeatureGroupSummaryTypeDef,
)

import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand All @@ -23,11 +30,11 @@

@dataclass
class FeatureGroupProcessor:
sagemaker_client: Any
sagemaker_client: SageMakerClient
env: str
report: SagemakerSourceReport

def get_all_feature_groups(self) -> List[Dict[str, Any]]:
def get_all_feature_groups(self) -> List[FeatureGroupSummaryTypeDef]:
"""
List all feature groups in SageMaker.
"""
Expand All @@ -41,7 +48,9 @@ def get_all_feature_groups(self) -> List[Dict[str, Any]]:

return feature_groups

def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]:
def get_feature_group_details(
self, feature_group_name: str
) -> DescribeFeatureGroupResponseTypeDef:
"""
Get details of a feature group (including list of component features).
"""
Expand All @@ -59,15 +68,13 @@ def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]:
next_features = self.sagemaker_client.describe_feature_group(
FeatureGroupName=feature_group_name, NextToken=next_token
)
feature_group["FeatureDefinitions"].append(
next_features["FeatureDefinitions"]
)
feature_group["FeatureDefinitions"] += next_features["FeatureDefinitions"]
next_token = feature_group.get("NextToken", "")

return feature_group

def get_feature_group_wu(
self, feature_group_details: Dict[str, Any]
self, feature_group_details: DescribeFeatureGroupResponseTypeDef
) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable workunit for a SageMaker feature group.
Expand Down Expand Up @@ -138,7 +145,9 @@ def get_feature_type(self, aws_type: str, feature_name: str) -> str:
return mapped_type

def get_feature_wu(
self, feature_group_details: Dict[str, Any], feature: Dict[str, Any]
self,
feature_group_details: DescribeFeatureGroupResponseTypeDef,
feature: FeatureDefinitionTypeDef,
) -> MetadataWorkUnit:
"""
Generate an MLFeature workunit for a SageMaker feature.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
from typing import Dict

from typing_extensions import Final

from datahub.metadata.schema_classes import JobStatusClass


class SageMakerJobInfo:

# Note: The weird typing for the following commands is because the child classes
# use a Final type to ensure that they're interpreted by the boto3 stubs correctly.
# If we try to type these as plain strings, we get a TypeError because it's being converted
# to a non-overwritable type.
# See https://mypy.readthedocs.io/en/stable/final_attrs.html#details-of-using-final

# boto3 command to get list of jobs
@property
def list_command(self) -> str:
raise NotImplementedError

# field in job listing response containing actual list
@property
def list_key(self) -> str:
raise NotImplementedError

# field in job listing response element corresponding to job name
@property
def list_name_key(self) -> str:
raise NotImplementedError

# field in job listing response element corresponding to job ARN
@property
def list_arn_key(self) -> str:
raise NotImplementedError

# boto3 command to get job details
@property
def describe_command(self) -> str:
raise NotImplementedError

# field in job description response corresponding to job name
@property
def describe_name_key(self) -> str:
raise NotImplementedError

# field in job description response corresponding to job ARN
@property
def describe_arn_key(self) -> str:
raise NotImplementedError

# field in job description response corresponding to job status
@property
def describe_status_key(self) -> str:
raise NotImplementedError

# job-specific mapping from boto3 status strings to DataHub-native enum
status_map: Dict[str, str]

# name of function for processing job for ingestion
processor: str


class AutoMlJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_auto_ml_jobs
list_command: Final = "list_auto_ml_jobs"
list_key: Final = "AutoMLJobSummaries"
list_name_key: Final = "AutoMLJobName"
list_arn_key: Final = "AutoMLJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_auto_ml_job
describe_command: Final = "describe_auto_ml_job"
describe_name_key: Final = "AutoMLJobName"
describe_arn_key: Final = "AutoMLJobArn"
describe_status_key: Final = "AutoMLJobStatus"
status_map = {
"Completed": JobStatusClass.COMPLETED,
"InProgress": JobStatusClass.IN_PROGRESS,
"Failed": JobStatusClass.FAILED,
"Stopped": JobStatusClass.STOPPED,
"Stopping": JobStatusClass.STOPPING,
}

processor = "process_auto_ml_job"


class CompilationJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_compilation_jobs
list_command: Final = "list_compilation_jobs"
list_key: Final = "CompilationJobSummaries"
list_name_key: Final = "CompilationJobName"
list_arn_key: Final = "CompilationJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_compilation_job
describe_command: Final = "describe_compilation_job"
describe_name_key: Final = "CompilationJobName"
describe_arn_key: Final = "CompilationJobArn"
describe_status_key: Final = "CompilationJobStatus"
status_map = {
"INPROGRESS": JobStatusClass.IN_PROGRESS,
"COMPLETED": JobStatusClass.COMPLETED,
"FAILED": JobStatusClass.FAILED,
"STARTING": JobStatusClass.STARTING,
"STOPPING": JobStatusClass.STOPPING,
"STOPPED": JobStatusClass.STOPPED,
}
processor = "process_compilation_job"


class EdgePackagingJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_edge_packaging_jobs
list_command: Final = "list_edge_packaging_jobs"
list_key: Final = "EdgePackagingJobSummaries"
list_name_key: Final = "EdgePackagingJobName"
list_arn_key: Final = "EdgePackagingJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_edge_packaging_job
describe_command: Final = "describe_edge_packaging_job"
describe_name_key: Final = "EdgePackagingJobName"
describe_arn_key: Final = "EdgePackagingJobArn"
describe_status_key: Final = "EdgePackagingJobStatus"
status_map = {
"INPROGRESS": JobStatusClass.IN_PROGRESS,
"COMPLETED": JobStatusClass.COMPLETED,
"FAILED": JobStatusClass.FAILED,
"STARTING": JobStatusClass.STARTING,
"STOPPING": JobStatusClass.STOPPING,
"STOPPED": JobStatusClass.STOPPED,
}
processor = "process_edge_packaging_job"


class HyperParameterTuningJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_hyper_parameter_tuning_jobs
list_command: Final = "list_hyper_parameter_tuning_jobs"
list_key: Final = "HyperParameterTuningJobSummaries"
list_name_key: Final = "HyperParameterTuningJobName"
list_arn_key: Final = "HyperParameterTuningJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_hyper_parameter_tuning_job
describe_command: Final = "describe_hyper_parameter_tuning_job"
describe_name_key: Final = "HyperParameterTuningJobName"
describe_arn_key: Final = "HyperParameterTuningJobArn"
describe_status_key: Final = "HyperParameterTuningJobStatus"
status_map = {
"InProgress": JobStatusClass.IN_PROGRESS,
"Completed": JobStatusClass.COMPLETED,
"Failed": JobStatusClass.FAILED,
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
}
processor = "process_hyper_parameter_tuning_job"


class LabelingJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_labeling_jobs
list_command: Final = "list_labeling_jobs"
list_key: Final = "LabelingJobSummaryList"
list_name_key: Final = "LabelingJobName"
list_arn_key: Final = "LabelingJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_labeling_job
describe_command: Final = "describe_labeling_job"
describe_name_key: Final = "LabelingJobName"
describe_arn_key: Final = "LabelingJobArn"
describe_status_key: Final = "LabelingJobStatus"
status_map = {
"Initializing": JobStatusClass.STARTING,
"InProgress": JobStatusClass.IN_PROGRESS,
"Completed": JobStatusClass.COMPLETED,
"Failed": JobStatusClass.FAILED,
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
}
processor = "process_labeling_job"


class ProcessingJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_processing_jobs
list_command: Final = "list_processing_jobs"
list_key: Final = "ProcessingJobSummaries"
list_name_key: Final = "ProcessingJobName"
list_arn_key: Final = "ProcessingJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_processing_job
describe_command: Final = "describe_processing_job"
describe_name_key: Final = "ProcessingJobName"
describe_arn_key: Final = "ProcessingJobArn"
describe_status_key: Final = "ProcessingJobStatus"
status_map = {
"InProgress": JobStatusClass.IN_PROGRESS,
"Completed": JobStatusClass.COMPLETED,
"Failed": JobStatusClass.FAILED,
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
}
processor = "process_processing_job"


class TrainingJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_training_jobs
list_command: Final = "list_training_jobs"
list_key: Final = "TrainingJobSummaries"
list_name_key: Final = "TrainingJobName"
list_arn_key: Final = "TrainingJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_training_job
describe_command: Final = "describe_training_job"
describe_name_key: Final = "TrainingJobName"
describe_arn_key: Final = "TrainingJobArn"
describe_status_key: Final = "TrainingJobStatus"
status_map = {
"InProgress": JobStatusClass.IN_PROGRESS,
"Completed": JobStatusClass.COMPLETED,
"Failed": JobStatusClass.FAILED,
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
}
processor = "process_training_job"


class TransformJobInfo(SageMakerJobInfo):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_transform_jobs
list_command: Final = "list_transform_jobs"
list_key: Final = "TransformJobSummaries"
list_name_key: Final = "TransformJobName"
list_arn_key: Final = "TransformJobArn"
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_transform_job
describe_command: Final = "describe_transform_job"
describe_name_key: Final = "TransformJobName"
describe_arn_key: Final = "TransformJobArn"
describe_status_key: Final = "TransformJobStatus"
status_map = {
"InProgress": JobStatusClass.IN_PROGRESS,
"Completed": JobStatusClass.COMPLETED,
"Failed": JobStatusClass.FAILED,
"Stopping": JobStatusClass.STOPPING,
"Stopped": JobStatusClass.STOPPED,
}
processor = "process_transform_job"
Loading