From d110ce1f976c1be66d9a4ba7062ac99ba4cdc68a Mon Sep 17 00:00:00 2001 From: Tim <50115603+bossenti@users.noreply.github.com> Date: Wed, 18 Jan 2023 18:25:37 +0100 Subject: [PATCH] refactor(ingest/athena): Replace `s3_staging_dir` parameter in Athena source with `query_result_location` (#7044) Co-authored-by: John Joyce --- .../datahub/ingestion/source/sql/athena.py | 29 ++++++++++--- .../tests/unit/test_athena_source.py | 42 ++++++++++++++++++- 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index fc6c821bf90db..a3b66ebb91c87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -8,6 +8,7 @@ from pyathena.model import AthenaTableMetadata from sqlalchemy.engine.reflection import Inspector +from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter.mcp_builder import DatabaseKey, gen_containers from datahub.ingestion.api.decorators import ( SourceCapability, @@ -50,17 +51,34 @@ class AthenaConfig(SQLAlchemyConfig): default=3600, description="Duration to assume the AWS Role for. Maximum of 43200 (12 hours)", ) - s3_staging_dir: str = pydantic.Field( - description="Staging s3 location where the Athena query results will be stored" + s3_staging_dir: Optional[str] = pydantic.Field( + default=None, + deprecated=True, + description="[deprecated in favor of `query_result_location`] S3 query location", ) work_group: str = pydantic.Field( description="The name of your Amazon Athena Workgroups" ) catalog_name: str = pydantic.Field( - default="awsdatacatalog", description="Athena Catalog Name" + default="awsdatacatalog", + description="Athena Catalog Name", + ) + + query_result_location: str = pydantic.Field( + description="S3 path to the [query result bucket](https://docs.aws.amazon.com/athena/latest/ug/querying.html#query-results-specify-location) which should be used by AWS Athena to store results of the" + "queries executed by DataHub." ) - include_views = False # not supported for Athena + # overwrite default behavior of SQLAlchemyConfing + include_views: Optional[bool] = pydantic.Field( + default=False, description="Whether views should be ingested." + ) + + _s3_staging_dir_population = pydantic_renamed_field( + old_name="s3_staging_dir", + new_name="query_result_location", + print_warning=True, + ) def get_sql_alchemy_url(self): return make_sqlalchemy_uri( @@ -70,7 +88,8 @@ def get_sql_alchemy_url(self): f"athena.{self.aws_region}.amazonaws.com:443", self.database, uri_opts={ - "s3_staging_dir": self.s3_staging_dir, + # as an URI option `s3_staging_dir` is still used due to PyAthena + "s3_staging_dir": self.query_result_location, "work_group": self.work_group, "catalog_name": self.catalog_name, "role_arn": self.aws_role_arn, diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index 60083a961d3b6..2558f6a46715e 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -11,7 +11,22 @@ @pytest.mark.integration -def test_athena_uri(): +def test_athena_config_query_location_old_plus_new_value_not_allowed(): + from datahub.ingestion.source.sql.athena import AthenaConfig + + with pytest.raises(ValueError): + AthenaConfig.parse_obj( + { + "aws_region": "us-west-1", + "s3_staging_dir": "s3://sample-staging-dir/", + "query_result_location": "s3://query_result_location", + "work_group": "test-workgroup", + } + ) + + +@pytest.mark.integration +def test_athena_config_staging_dir_is_set_as_query_result(): from datahub.ingestion.source.sql.athena import AthenaConfig config = AthenaConfig.parse_obj( @@ -21,9 +36,32 @@ def test_athena_uri(): "work_group": "test-workgroup", } ) + + expected_config = AthenaConfig.parse_obj( + { + "aws_region": "us-west-1", + "query_result_location": "s3://sample-staging-dir/", + "work_group": "test-workgroup", + } + ) + + assert config.json() == expected_config.json() + + +@pytest.mark.integration +def test_athena_uri(): + from datahub.ingestion.source.sql.athena import AthenaConfig + + config = AthenaConfig.parse_obj( + { + "aws_region": "us-west-1", + "query_result_location": "s3://query-result-location/", + "work_group": "test-workgroup", + } + ) assert ( config.get_sql_alchemy_url() - == "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fsample-staging-dir%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600" + == "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fquery-result-location%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600" )