Skip to content

Commit

Permalink
Refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MatMoore committed Nov 24, 2023
1 parent 291adfa commit 5b29d71
Showing 1 changed file with 61 additions and 110 deletions.
171 changes: 61 additions & 110 deletions containers/daap-python-base/tests/unit/versioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,170 +423,121 @@ def setup(
data_product_name,
data_product_versions,
data_product_major_versions,
create_raw_and_curated_data,
):
self.s3_client = s3_client
self.bucket_name = metadata_bucket
self.data_product_name = data_product_name
self.latest_major_version = "v2"
self.new_major_version = "v3"
self.number_of_schemas = 3

for version in data_product_versions:
s3_client.put_object(
Body=json.dumps(test_metadata_with_schemas),
Bucket=self.bucket_name,
Key=f"{data_product_name}/{version}/metadata.json",
)
for i in range(3):
for i in range(self.number_of_schemas):
for version in data_product_versions:
s3_client.put_object(
Body=json.dumps(test_schema),
Bucket=self.bucket_name,
Key=f"{data_product_name}/{version}/schema{i}/schema.json",
)

self.latest_major_version = "v2"
for major_version in data_product_major_versions:
database_name = f"{data_product_name}_{major_version}"

glue_client.create_database(DatabaseInput={"Name": database_name})

for i in range(3):
for i in range(self.number_of_schemas):
glue_client.create_table(
DatabaseName=database_name,
TableInput={"Name": f"schema{i}"},
)

def test_success(
self, s3_client, create_raw_and_curated_data, data_product_name, glue_client
):
version_manager = VersionManager(data_product_name, logging.getLogger())
schema_list = ["schema0"]
@pytest.fixture(autouse=True)
def setup_subject(self, glue_client, data_product_name):
with patch("glue_and_athena_utils.glue_client", glue_client):
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
schema_prefix = f"{data_product_name}/v2.0/metadata.json"
response = s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=schema_prefix,
)
assert response.get("KeyCount") == 1
self.version_manager = VersionManager(data_product_name, logger)
yield

def test_invalid_schemas(self, data_product_name):
version_manager = VersionManager(data_product_name, logging.getLogger())
def test_success(self):
schema_list = ["schema0"]
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)

schema_prefix = f"{self.data_product_name}/v2.0/metadata.json"
self.assert_object_count(self.bucket_name, schema_prefix, 1)

def test_invalid_schemas(self):
schema_list = ["schema3", "schema4"]

with pytest.raises(InvalidUpdate) as exc:
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)
assert (
str(exc.value)
== "Invalid schemas found in schema_list: ['schema3', 'schema4']"
)

def test_glue_table_not_found(
self,
s3_client,
create_raw_and_curated_data,
data_product_name,
glue_client,
table_name,
):
version_manager = VersionManager(data_product_name, logging.getLogger())
schema_list = ["schema0", "banana"]
with patch("glue_and_athena_utils.glue_client", glue_client):
with pytest.raises(InvalidUpdate):
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
with pytest.raises(InvalidUpdate):
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)

def test_schema_glue_table_deleted(
self, s3_client, create_raw_and_curated_data, data_product_name, glue_client
):
version_manager = VersionManager(data_product_name, logging.getLogger())
def test_data_files_deleted(self):
schema_list = ["schema0"]
with patch("glue_and_athena_utils.glue_client", glue_client):
# Call the handler
version_manager.update_metadata_remove_schemas(schema_list=schema_list)

assert not table_exists(
database_name=self.latest_major_version, table_name=schema_list[0]
)

def test_data_files_deleted(
self,
s3_client,
create_raw_and_curated_data,
data_product_name,
glue_client,
data_product_major_versions,
):
version_manager = VersionManager(data_product_name, logging.getLogger())
schema_list = ["schema0"]
with patch("glue_and_athena_utils.glue_client", glue_client):
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)

version = "v3"
curated_prefix = f"curated/{data_product_name}/{version}/schema0/"
response = s3_client.list_objects_v2(
Bucket=os.getenv("CURATED_DATA_BUCKET"),
Prefix=curated_prefix,
)
assert response.get("KeyCount") == 0
curated_prefix = (
f"curated/{self.data_product_name}/{self.latest_major_version}/schema0/"
)
raw_prefix = (
f"raw/{self.data_product_name}/{self.latest_major_version}/schema0/"
)

raw_prefix = f"raw/{data_product_name}/{version}/schema0/"
response = s3_client.list_objects_v2(
Bucket=os.getenv("RAW_DATA_BUCKET"),
Prefix=raw_prefix,
)
assert response.get("KeyCount") == 0
self.assert_object_count(os.getenv("CURATED_DATA_BUCKET"), curated_prefix, 0)
self.assert_object_count(os.getenv("RAW_DATA_BUCKET"), raw_prefix, 0)

def test_deleted_schema_files_removed_from_new_version(
self,
s3_client,
create_raw_and_curated_data,
data_product_name,
glue_client,
data_product_versions,
):
version_manager = VersionManager(data_product_name, logging.getLogger())
schema_list = ["schema0"]

with patch("glue_and_athena_utils.glue_client", glue_client):
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
schema_prefix = f"{data_product_name}/v3.0/{schema_list[0]}/schema.json"
response = s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=schema_prefix,
)
assert response.get("KeyCount") == 0
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)
schema_prefix = f"{self.data_product_name}/v3.0/{schema_list[0]}/schema.json"
self.assert_object_count(self.bucket_name, schema_prefix, 0)

def test_deleted_table_removed_from_new_version(
self,
data_product_name,
glue_client,
create_raw_and_curated_data,
data_product_versions,
):
version_manager = VersionManager(data_product_name, logging.getLogger())
def test_deleted_table_removed_from_new_version(self):
schema_list = ["schema0"]

with patch("glue_and_athena_utils.glue_client", glue_client):
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)

expected_database_name = f"{data_product_name}_v3"
assert database_exists(expected_database_name, logger=logger)
assert table_exists(expected_database_name, "schema1")
assert not table_exists(expected_database_name, "schema0")
expected_database_name = f"{self.data_product_name}_{self.new_major_version}"
assert database_exists(expected_database_name, logger=logger)
assert table_exists(expected_database_name, "schema1")
assert not table_exists(expected_database_name, "schema0")

def test_validate_other_schemas_are_upversioned(
self,
s3_client,
create_raw_and_curated_data,
data_product_name,
glue_client,
data_product_versions,
):
version_manager = VersionManager(data_product_name, logging.getLogger())
def test_validate_other_schemas_are_upversioned(self):
schema_list = ["schema0"]

with patch("glue_and_athena_utils.glue_client", glue_client):
# Call the handler
version_manager.update_metadata_remove_schemas(schema_list=schema_list)
for i in range(1, 3):
schema_prefix = f"{data_product_name}/v2.0/schema{i}/schema.json"
response = s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=schema_prefix,
)
assert response.get("KeyCount") == 1
self.version_manager.update_metadata_remove_schemas(schema_list=schema_list)
for i in range(1, self.number_of_schemas):
self.assert_object_exists(
self.bucket_name,
f"{self.data_product_name}/v3.0/schema{i}/schema.json",
)

def assert_object_count(self, bucket, prefix, expected_count):
response = self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
)
actual_count = response.get("KeyCount")
assert actual_count == expected_count

def assert_object_exists(self, bucket, key):
return self.assert_object_count(bucket, key, 1)

0 comments on commit 5b29d71

Please sign in to comment.