Skip to content

Commit

Permalink
refactor(ingest/athena): Replace s3_staging_dir parameter in Athena…
Browse files Browse the repository at this point in the history
… source with `query_result_location` (datahub-project#7044)

Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
2 people authored and Eric Yomi committed Jan 18, 2023
1 parent 8dd0b31 commit d110ce1
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
29 changes: 24 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyathena.model import AthenaTableMetadata
from sqlalchemy.engine.reflection import Inspector

from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mcp_builder import DatabaseKey, gen_containers
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -50,17 +51,34 @@ class AthenaConfig(SQLAlchemyConfig):
default=3600,
description="Duration to assume the AWS Role for. Maximum of 43200 (12 hours)",
)
s3_staging_dir: str = pydantic.Field(
description="Staging s3 location where the Athena query results will be stored"
s3_staging_dir: Optional[str] = pydantic.Field(
default=None,
deprecated=True,
description="[deprecated in favor of `query_result_location`] S3 query location",
)
work_group: str = pydantic.Field(
description="The name of your Amazon Athena Workgroups"
)
catalog_name: str = pydantic.Field(
default="awsdatacatalog", description="Athena Catalog Name"
default="awsdatacatalog",
description="Athena Catalog Name",
)

query_result_location: str = pydantic.Field(
description="S3 path to the [query result bucket](https://docs.aws.amazon.com/athena/latest/ug/querying.html#query-results-specify-location) which should be used by AWS Athena to store results of the"
"queries executed by DataHub."
)

include_views = False # not supported for Athena
# overwrite default behavior of SQLAlchemyConfing
include_views: Optional[bool] = pydantic.Field(
default=False, description="Whether views should be ingested."
)

_s3_staging_dir_population = pydantic_renamed_field(
old_name="s3_staging_dir",
new_name="query_result_location",
print_warning=True,
)

def get_sql_alchemy_url(self):
return make_sqlalchemy_uri(
Expand All @@ -70,7 +88,8 @@ def get_sql_alchemy_url(self):
f"athena.{self.aws_region}.amazonaws.com:443",
self.database,
uri_opts={
"s3_staging_dir": self.s3_staging_dir,
# as an URI option `s3_staging_dir` is still used due to PyAthena
"s3_staging_dir": self.query_result_location,
"work_group": self.work_group,
"catalog_name": self.catalog_name,
"role_arn": self.aws_role_arn,
Expand Down
42 changes: 40 additions & 2 deletions metadata-ingestion/tests/unit/test_athena_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,22 @@


@pytest.mark.integration
def test_athena_uri():
def test_athena_config_query_location_old_plus_new_value_not_allowed():
from datahub.ingestion.source.sql.athena import AthenaConfig

with pytest.raises(ValueError):
AthenaConfig.parse_obj(
{
"aws_region": "us-west-1",
"s3_staging_dir": "s3://sample-staging-dir/",
"query_result_location": "s3://query_result_location",
"work_group": "test-workgroup",
}
)


@pytest.mark.integration
def test_athena_config_staging_dir_is_set_as_query_result():
from datahub.ingestion.source.sql.athena import AthenaConfig

config = AthenaConfig.parse_obj(
Expand All @@ -21,9 +36,32 @@ def test_athena_uri():
"work_group": "test-workgroup",
}
)

expected_config = AthenaConfig.parse_obj(
{
"aws_region": "us-west-1",
"query_result_location": "s3://sample-staging-dir/",
"work_group": "test-workgroup",
}
)

assert config.json() == expected_config.json()


@pytest.mark.integration
def test_athena_uri():
from datahub.ingestion.source.sql.athena import AthenaConfig

config = AthenaConfig.parse_obj(
{
"aws_region": "us-west-1",
"query_result_location": "s3://query-result-location/",
"work_group": "test-workgroup",
}
)
assert (
config.get_sql_alchemy_url()
== "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fsample-staging-dir%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600"
== "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fquery-result-location%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600"
)


Expand Down

0 comments on commit d110ce1

Please sign in to comment.