diff --git a/Dockerfile b/Dockerfile index 35b15a0..c607f10 100644 --- a/Dockerfile +++ b/Dockerfile @@ -39,6 +39,6 @@ CMD python ./app.py FROM base as test -ADD tests/test_* tests/floorplan_* tests/requirements.txt ./tests/ +ADD tests/test_* tests/unit/ tests/floorplan_* tests/requirements.txt ./tests/ -RUN pip install --no-cache-dir -r tests/requirements.txt +RUN pip install --no-cache-dir .[test] -r tests/requirements.txt diff --git a/setup.cfg b/setup.cfg index 38c452d..0d66564 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,3 +16,4 @@ python_requires = >=3.6, <4 [options.extras_require] test = pytest + pytest-mock diff --git a/src/floorist/floorist.py b/src/floorist/floorist.py index 26b71c9..c59dd63 100644 --- a/src/floorist/floorist.py +++ b/src/floorist/floorist.py @@ -10,6 +10,9 @@ import pandas as pd import yaml +from floorist.helpers import generate_name, validate_floorplan_entry + + def _configure_loglevel(): LOGLEVEL = environ.get('LOGLEVEL', 'INFO').upper() @@ -43,15 +46,17 @@ def main(): dump_count += 1 try: - logging.debug(f"Dumping #{dump_count}: {row['query']} to {row['prefix']}") - cursor = pd.read_sql(row['query'], conn, chunksize=row.get('chunksize', 1000)) + query = row['query'] + prefix = row['prefix'] + chunksize = row.get('chunksize', 1000) + + logging.debug(f"Dumping #{dump_count}: {query} to {prefix}") + + validate_floorplan_entry(query, prefix) - target = '/'.join([ - f"s3://{config.bucket_name}", - row['prefix'], - date.today().strftime('year_created=%Y/month_created=%-m/day_created=%-d') - ]) + cursor = pd.read_sql(query, conn, chunksize=chunksize) + target = generate_name(config.bucket_name, prefix) uuids = {} @@ -72,7 +77,7 @@ def main(): mode='append' ) - logging.debug(f"Dumped #{dumped_count}: {row['query']} to {row['prefix']}") + logging.debug(f"Dumped #{dumped_count}: {query} to {prefix}") dumped_count += 1 except Exception as ex: diff --git a/src/floorist/helpers.py b/src/floorist/helpers.py new file mode 100644 index 0000000..23da74a --- /dev/null +++ b/src/floorist/helpers.py @@ -0,0 +1,19 @@ +from datetime import date + + +def generate_name(bucket_name, prefix=None): + + file_name = date.today().strftime('year_created=%Y/month_created=%-m/day_created=%-d') + parts = ["s3:/", bucket_name, file_name] + if prefix: + parts.insert(2, prefix) + + return '/'.join(parts) + +def validate_floorplan_entry(query, prefix): + if not query: + raise ValueError("Query cannot be empty!") + elif not prefix: + raise ValueError("Prefix cannot be empty!") + else: + return True diff --git a/tests/floorplan_valid_with_prefix.yaml b/tests/floorplan_valid_with_prefix.yaml new file mode 100644 index 0000000..25dd64d --- /dev/null +++ b/tests/floorplan_valid_with_prefix.yaml @@ -0,0 +1,2 @@ +- query: SELECT * FROM (VALUES (1, 'one'), (2, 'two'), (3, 'three')) AS t (num,letter); + prefix: some-prefix diff --git a/tests/test_floorist.py b/tests/test_floorist.py index 445b03f..656642f 100644 --- a/tests/test_floorist.py +++ b/tests/test_floorist.py @@ -1,3 +1,6 @@ +import os +import re + import awswrangler as wr import boto3 import pytest @@ -10,6 +13,8 @@ from sqlalchemy.exc import OperationalError from tempfile import NamedTemporaryFile +from floorist.helpers import generate_name + class TestFloorist: @pytest.fixture(autouse=True) @@ -192,3 +197,13 @@ def test_floorplan_valid(self, caplog, session): assert len(wr.s3.list_objects(f"{prefix}/valid/", boto3_session=session)) == 1 df = wr.s3.read_parquet(f"{prefix}/valid/", boto3_session=session) assert len(df), 3 + + @pytest.mark.parametrize("template,prefix", [["tests/floorplan_valid.yaml", None], ["tests/floorplan_valid_with_prefix.yaml", "some-prefix"]]) + def test_target_files_have_expected_names(self, template, prefix, session): + bucket = f"s3://{env['AWS_BUCKET']}" + env['FLOORPLAN_FILE'] = template + filename = generate_name(env['AWS_BUCKET'], prefix) + main() + existing_objects = wr.s3.list_objects(bucket, boto3_session=session) + assert len(existing_objects) == 1 + assert re.match(rf"{filename}/[0-z]*\.gz.parquet", existing_objects[0]) diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py new file mode 100644 index 0000000..280e703 --- /dev/null +++ b/tests/unit/test_core.py @@ -0,0 +1,37 @@ +import uuid + +from floorist.floorist import main +from floorist.config import Config +from pandas import DataFrame + + +def test_floorplan_without_prefix_raises_exception_keeps_reading_other_floorplans(mocker): + + mocker.patch('floorist.floorist.open') + mocker.patch('floorist.floorist.logging') + + config_mock = mocker.patch('floorist.floorist.get_config') + config_mock.return_value = Config(bucket_name='foo') + + awswrangler_mock = mocker.patch('floorist.floorist.wr') + + connection_engine_mock = mocker.patch('floorist.floorist.create_engine') + connection_mock = connection_engine_mock().connect().execution_options() + + exit_mock = mocker.patch('floorist.floorist.exit') + + safe_load_mock = mocker.patch('floorist.floorist.yaml.safe_load') + safe_load_mock.return_value = [{'query': "a query", 'prefix': None}, {'query': 'another-query', 'prefix': 'a prefix'}] + + pandas_mock = mocker.patch('floorist.floorist.pd') + data_stub = DataFrame({ + 'ID': [uuid.uuid4(), uuid.uuid4(), uuid.uuid4()], + 'columnA': ["foo", "bar", "baz"] + }) + pandas_mock.read_sql.return_value = [data_stub] + + main() + + pandas_mock.read_sql.assert_called_once_with("another-query", connection_mock, chunksize=1000) + data_stub.equals(awswrangler_mock.s3.to_parquet.call_args[0]) + exit_mock.assert_called_once_with(1) diff --git a/tests/unit/test_helpers.py b/tests/unit/test_helpers.py new file mode 100644 index 0000000..2bbd90b --- /dev/null +++ b/tests/unit/test_helpers.py @@ -0,0 +1,37 @@ +from floorist.helpers import generate_name +from floorist.helpers import validate_floorplan_entry +from datetime import date + +import pytest + + +def test_name_without_prefix(): + bucket_name = "my_bucket" + actual_name = generate_name("my_bucket") + name = date.today().strftime('year_created=%Y/month_created=%-m/day_created=%-d') + expected_name = f"s3://{bucket_name}/{name}" + + assert actual_name == expected_name + + +def test_name_with_prefix(): + bucket_name = "my_bucket" + prefix = "some-prefix" + actual_name = generate_name(bucket_name, prefix) + name = date.today().strftime('year_created=%Y/month_created=%-m/day_created=%-d') + expected_name = f"s3://{bucket_name}/{prefix}/{name}" + + assert actual_name == expected_name + +@pytest.mark.parametrize("query,prefix", [(None, "prefix"), (None, None), ("query", None)]) +def test_validate_floorplan_entry_captures_invalid_data(query,prefix): + with pytest.raises(ValueError) as excinfo: + validate_floorplan_entry(query,prefix) + + if (not prefix and not query) or not query: + assert "Query cannot be empty!" in str(excinfo.value) + elif not prefix: + assert "Prefix cannot be empty" in str(excinfo.value) + +def test_validate_floorplan_entry_checks_valid_data(): + assert validate_floorplan_entry("query", "prefix")