Skip to content

Commit

Permalink
feat: Added testcommon.etl function to write 'when' files to storage (#…
Browse files Browse the repository at this point in the history
…84)

Co-authored-by: Jakob Stricker Nielsen <[email protected]>
Co-authored-by: Henrik Tornbjerg Carøe <[email protected]>
  • Loading branch information
3 people authored Jan 27, 2025
1 parent cf9e538 commit 1d72b1e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/create-release-tag.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ jobs:
# BE AWARE --> Updating to a new MAJOR version will delete deprecated versions on a nightly schedule.
# See https://github.com/Energinet-DataHub/.github#release-procedure for details
major_version: 3
minor_version: 1
patch_version: 1
minor_version: 2
patch_version: 0
repository_path: Energinet-DataHub/opengeh-python-packages
env:
GH_TOKEN: ${{ steps.generate_token.outputs.token }}
5 changes: 5 additions & 0 deletions source/testcommon/release-notes/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Testcommon Release Notes

## Version 0.1.0

Modified `get_then_names` to accept a `scenario_path` as a parameter for locating the `/then` files.
Added a `write_to_delta.py` file for writing a list of test CSV files to a delta table.

## Version 0.0.9

Add `__init__.py` in `delta_lake` folder
Expand Down
2 changes: 1 addition & 1 deletion source/testcommon/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="opengeh-testcommon",
version="0.0.6",
version="0.1.0",
description="Shared testing utilities for OpenGEH Python packages",
long_description="",
long_description_content_type="text/markdown",
Expand Down
39 changes: 39 additions & 0 deletions source/testcommon/testcommon/dataframes/write_to_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os

from pyspark.sql import types as T
from pyspark.sql import SparkSession

from testcommon.dataframes import read_csv


def write_when_files_to_delta(
spark: SparkSession,
scenario_path: str,
files: list[tuple[str, T.StructType]]
) -> None:
"""
Writes a list of files to a delta table, using the filenames (without the file extension) as table names.
If the Delta table does not exist, the function will create it. Otherwise, if a table already exists, its content
will be overwritten
Args:
spark (SparkSession): The Spark session.
scenario_path (str): The path to the scenario CSV file.
files (list[tuple[str, T.StructType]]): A list of tuples containing filenames and their corresponding schemas.
"""

for file_name, schema in files:
file_path = f"{scenario_path}/when/{file_name}"
if not os.path.exists(file_path):
continue
df = read_csv(
spark,
file_path,
schema,
)

# Overwrite destination table with DataFrame
try:
df.write.mode("overwrite").saveAsTable(file_name.removesuffix(".csv"))
except Exception as e:
print(f"Error executing overwrite on table {file_name}: {str(e)}")
28 changes: 20 additions & 8 deletions source/testcommon/testcommon/etl/get_then_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,33 @@
from pathlib import Path


def get_then_names() -> list[str]:
"""Get the names of the CSV files in the `then` folder in an ETL test.
def get_then_names(scenario_path=None) -> list[str]:
"""
Retrieves a list of file paths for the CSV files in the `/then` folder for a scenario test.
If 'scenario_path' is provided, it will be utilized to locate the '/then' files.
Otherwise, the function will dynamically locate the files by inspecting the call stack for the caller.
Args:
scenario_path (str, optional): Path to the scenario folder. Defaults to None.
Returns:
list[str]: A list of file paths relative to the `then` folder, without file extensions.
"""
test_file_path = inspect.stack()[1].filename
output_folder_path = Path(test_file_path).parent / "then"

if scenario_path is not None:
output_folder_path = Path(scenario_path)
else:
output_folder_path = Path(inspect.stack()[1].filename).parent

then_output_folder_path = output_folder_path / "then"

if not output_folder_path.exists():
raise FileNotFoundError(
f"Could not find the 'then' folder in {Path(test_file_path).parent}"
f"Could not find the 'then' folder in {output_folder_path}"
)
then_files = list(output_folder_path.rglob("*.csv"))
then_files = list(then_output_folder_path.rglob("*.csv"))
if not then_files:
raise FileNotFoundError(
f"Could not find any CSV files in the 'then' folder in {Path(test_file_path).parent}"
f"Could not find any CSV files in the 'then' folder in {output_folder_path}"
)
return [str(f.relative_to(output_folder_path).with_suffix('')) for f in then_files]
return [str(f.relative_to(then_output_folder_path).with_suffix('')) for f in then_files]

0 comments on commit 1d72b1e

Please sign in to comment.