Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.4.4 #139

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.3
current_version = 0.4.4
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
Expand Down
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ and this project adheres to [semantic versioning](https://semver.org/spec/v2.0.0

### Removed

## [0.4.4] - 2024-12-13

### Added

### Changed
- Modified `insert_df_to_hive_table` function in `cdp/io/output.py`. Added support
for creating non-existent Hive tables, repartitioning by column or partition count,
and handling missing columns with explicit type casting.

### Deprecated

### Fixed

### Removed

## [0.4.3] - 2024-12-05

### Added
Expand Down Expand Up @@ -501,6 +516,8 @@ and this project adheres to [semantic versioning](https://semver.org/spec/v2.0.0
> due to bugs in the GitHub Action `deploy_pypi.yaml`, which deploys to PyPI
> and GitHub Releases.

- rdsa-utils v0.4.4: [GitHub Release](https://github.com/ONSdigital/rdsa-utils/releases/tag/v0.4.4) |
[PyPI](https://pypi.org/project/rdsa-utils/0.4.4/)
- rdsa-utils v0.4.3: [GitHub Release](https://github.com/ONSdigital/rdsa-utils/releases/tag/v0.4.3) |
[PyPI](https://pypi.org/project/rdsa-utils/0.4.3/)
- rdsa-utils v0.4.2: [GitHub Release](https://github.com/ONSdigital/rdsa-utils/releases/tag/v0.4.2) |
Expand Down
2 changes: 1 addition & 1 deletion rdsa_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.3"
__version__ = "0.4.4"
4 changes: 2 additions & 2 deletions rdsa_utils/cdp/helpers/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,12 +1141,12 @@ def write_csv(

Examples
--------
>>> s3_client = boto3.client('s3')
>>> client = boto3.client('s3')
>>> data = pd.DataFrame({
>>> 'column1': [1, 2, 3],
>>> 'column2': ['a', 'b', 'c']
>>> })
>>> write_csv(s3_client, 'my_bucket', data, 'path/to/file.csv')
>>> write_csv(client, 'my_bucket', data, 'path/to/file.csv')
True
"""
try:
Expand Down
152 changes: 125 additions & 27 deletions rdsa_utils/cdp/io/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ def insert_df_to_hive_table(
table_name: str,
overwrite: bool = False,
fill_missing_cols: bool = False,
repartition_data_by: Union[int, str, None] = None,
) -> None:
"""Write the SparkDF contents to a Hive table.
"""Write SparkDF to Hive table with optional configuration.

This function writes data from a SparkDF into a Hive table, allowing
optional handling of missing columns. The table's column order is ensured to
match that of the DataFrame.
This function writes data from a SparkDF into a Hive table, handling missing
columns and optional repartitioning. It ensures the table's column order matches
the DataFrame and manages different overwrite behaviors for partitioned and
non-partitioned data.

Parameters
----------
Expand All @@ -52,10 +54,39 @@ def insert_df_to_hive_table(
table_name
Name of the Hive table to write data into.
overwrite
If True, existing data in the table will be overwritten,
by default False.
Controls how existing data is handled, default is False:

For non-partitioned data:
- True: Replaces entire table with DataFrame data.
- False: Appends DataFrame data to existing table.

For partitioned data:
- True: Replaces data only in partitions present in DataFrame.
- False: Appends data to existing partitions or creates new ones.
fill_missing_cols
If True, missing columns will be filled with nulls, by default False.
If True, adds missing columns as NULL values. If False, raises an error
on schema mismatch, default is False.

- Explicitly casts DataFrame columns to match the Hive table schema to
avoid type mismatch errors.
- Adds missing columns as NULL values when `fill_missing_cols` is True,
regardless of their data type (e.g., String, Integer, Double, Boolean, etc.).
repartition_data_by
Controls data repartitioning, default is None:
- int: Sets target number of partitions.
- str: Specifies column to repartition by.
- None: No repartitioning performed.

Notes
-----
When using repartition with a number:
- Affects physical file structure but preserves Hive partitioning scheme.
- Controls number of output files per write operation per Hive partition.
- Maintains partition-based query optimization.

When repartitioning by column:
- Helps balance file sizes across Hive partitions.
- Reduces creation of small files.

Raises
------
Expand All @@ -65,36 +96,81 @@ def insert_df_to_hive_table(
ValueError
If the SparkDF schema does not match the Hive table schema and
'fill_missing_cols' is set to False.
DataframeEmptyError
If input DataFrame is empty.
Exception
For other general exceptions when writing data to the table.
"""
logger.info(f"Preparing to write data to {table_name}.")

# Validate SparkDF before writing
if is_df_empty(df):
msg = f"Cannot write an empty SparkDF to {table_name}"
raise DataframeEmptyError(
msg,
)
Examples
--------
Write a DataFrame to a Hive table without overwriting:
>>> insert_df_to_hive_table(
... spark=spark,
... df=df,
... table_name="my_database.my_table"
... )

Overwrite an existing table with a DataFrame:
>>> insert_df_to_hive_table(
... spark=spark,
... df=df,
... table_name="my_database.my_table",
... overwrite=True
... )

Write a DataFrame to a Hive table with missing columns filled:
>>> insert_df_to_hive_table(
... spark=spark,
... df=df,
... table_name="my_database.my_table",
... fill_missing_cols=True
... )

Repartition by column before writing to Hive:
>>> insert_df_to_hive_table(
... spark=spark,
... df=df,
... table_name="my_database.my_table",
... repartition_data_by="partition_column"
... )

Repartition into a fixed number of partitions before writing:
>>> insert_df_to_hive_table(
... spark=spark,
... df=df,
... table_name="my_database.my_table",
... repartition_data_by=10
... )
"""
logger.info(f"Preparing to write data to {table_name} with overwrite={overwrite}.")

# Check if the table exists; if not, set flag for later creation
table_exists = True
try:
table_schema = spark.read.table(table_name).schema
table_columns = spark.read.table(table_name).columns
except AnalysisException:
logger.error(
(
f"Error reading table {table_name}. "
f"Make sure the table exists and you have access to it."
),
logger.info(
f"Table {table_name} does not exist and will be "
"created after transformations.",
)
table_exists = False
table_columns = df.columns # Use DataFrame columns as initial schema

raise
# Validate SparkDF before writing
if is_df_empty(df):
msg = f"Cannot write an empty SparkDF to {table_name}"
raise DataframeEmptyError(msg)

if fill_missing_cols:
# Handle missing columns if specified
if fill_missing_cols and table_exists:
missing_columns = list(set(table_columns) - set(df.columns))

for col in missing_columns:
df = df.withColumn(col, F.lit(None))
else:
column_type = [
field.dataType for field in table_schema if field.name == col
][0]
df = df.withColumn(col, F.lit(None).cast(column_type))
elif not fill_missing_cols and table_exists:
# Validate schema before writing
if set(table_columns) != set(df.columns):
msg = (
Expand All @@ -103,10 +179,32 @@ def insert_df_to_hive_table(
)
raise ValueError(msg)

df = df.select(table_columns)
# Ensure column order
df = df.select(table_columns) if table_exists else df

# Apply repartitioning if specified
if repartition_data_by is not None:
if isinstance(repartition_data_by, int):
logger.info(f"Repartitioning data into {repartition_data_by} partitions.")
df = df.repartition(repartition_data_by)
elif isinstance(repartition_data_by, str):
logger.info(f"Repartitioning data by column {repartition_data_by}.")
df = df.repartition(repartition_data_by)

# Write DataFrame to Hive table based on existence and overwrite parameter
try:
df.write.insertInto(table_name, overwrite)
if table_exists:
if overwrite:
logger.info(f"Overwriting existing table {table_name}.")
df.write.mode("overwrite").saveAsTable(table_name)
else:
logger.info(
f"Inserting into existing table {table_name} without overwrite.",
)
df.write.insertInto(table_name)
else:
df.write.saveAsTable(table_name)
logger.info(f"Table {table_name} created successfully.")
logger.info(f"Successfully wrote data to {table_name}.")
except Exception:
logger.error(f"Error writing data to {table_name}.")
Expand Down
Loading
Loading