Skip to content

Commit

Permalink
Merge pull request #33 from amosproj/feature/018_normalize_data
Browse files Browse the repository at this point in the history
018 Normalize data
  • Loading branch information
dh1542 authored Nov 9, 2024
2 parents 6e7c1ce + 8787802 commit a422e72
Show file tree
Hide file tree
Showing 8 changed files with 555 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .spark.data_quality.duplicate_detection import *
from .spark.data_quality.normalization.normalization import *
from .spark.data_quality.normalization.normalization_mean import *
from .spark.data_quality.normalization.normalization_minmax import *
from .spark.data_quality.normalization.normalization_zscore import *
from .spark.data_quality.normalization.denormalization import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame as PySparkDataFrame
from .....data_wranglers.interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType
from .normalization import (
NormalizationBaseClass,
)


class Denormalization(WranglerBaseInterface):
"""
#TODO
Applies the appropriate denormalization method to revert values to their original scale.
Example
--------
```python
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers import Denormalization
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
denormalization = Denormalization(normalized_df, normalization)
denormalized_df = denormalization.filter()
```
Parameters:
df (DataFrame): PySpark DataFrame to be reverted to its original scale.
normalization_to_revert (NormalizationBaseClass): An instance of the specific normalization subclass (NormalizationZScore, NormalizationMinMax, NormalizationMean) that was originally used to normalize the data.
"""

df: PySparkDataFrame
normalization_to_revert: NormalizationBaseClass

def __init__(
self, df: PySparkDataFrame, normalization_to_revert: NormalizationBaseClass
) -> None:
self.df = df
self.normalization_to_revert = normalization_to_revert

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def filter(self) -> PySparkDataFrame:
return self.normalization_to_revert.denormalize(self.df)
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import abstractmethod
from pyspark.sql import DataFrame as PySparkDataFrame
from typing import List
from .....data_wranglers.interfaces import WranglerBaseInterface
from ....._pipeline_utils.models import Libraries, SystemType


class NormalizationBaseClass(WranglerBaseInterface):
"""
A base class for applying normalization techniques to multiple columns in a PySpark DataFrame.
This class serves as a framework to support various normalization methods (e.g., Z-Score, Min-Max, and Mean),
with specific implementations in separate subclasses for each normalization type.
Subclasses should implement specific normalization and denormalization methods by inheriting from this base class.
Example
--------
```python
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers import NormalizationZScore
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
normalization = NormalizationZScore(df, column_names=["value_column_1", "value_column_2"], in_place=False)
normalized_df = normalization.filter()
```
Parameters:
df (DataFrame): PySpark DataFrame to be normalized.
column_names (List[str]): List of columns in the DataFrame to be normalized.
in_place (bool): If true, then result of normalization is stored in the same column.
Attributes:
NORMALIZATION_NAME_POSTFIX : str
Suffix added to the column name if a new column is created for normalized values.
"""

df: PySparkDataFrame
column_names: List[str]
in_place: bool

reversal_value: List[float]

# Appended to column name if new column is added
NORMALIZATION_NAME_POSTFIX: str = "normalization"

def __init__(
self, df: PySparkDataFrame, column_names: List[str], in_place: bool = False
) -> None:

for column_name in column_names:
if not column_name in df.columns:
raise ValueError("{} not found in the DataFrame.".format(column_name))

self.df = df
self.column_names = column_names
self.in_place = in_place

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def filter(self):
return self.normalize()

def normalize(self) -> PySparkDataFrame:
"""
Applies the specified normalization to each column in column_names.
Returns:
DataFrame: A PySpark DataFrame with the normalized values.
"""
normalized_df = self.df
for column in self.column_names:
normalized_df = self._normalize_column(normalized_df, column)
return normalized_df

def denormalize(self, input_df) -> PySparkDataFrame:
"""
Denormalizes the input DataFrame. Intended to be used by the denormalization component.
Parameters:
input_df (DataFrame): Dataframe containing the current data.
"""
denormalized_df = input_df
if not self.in_place:
for column in self.column_names:
denormalized_df = denormalized_df.drop(
self._get_norm_column_name(column)
)
else:
for column in self.column_names:
denormalized_df = self._denormalize_column(denormalized_df, column)
return denormalized_df

@property
@abstractmethod
def NORMALIZED_COLUMN_NAME(self): ...

@abstractmethod
def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
pass

@abstractmethod
def _denormalize_column(
self, df: PySparkDataFrame, column: str
) -> PySparkDataFrame:
pass

def _get_norm_column_name(self, column_name: str) -> str:
if not self.in_place:
return f"{column_name}_{self.NORMALIZED_COLUMN_NAME}_{self.NORMALIZATION_NAME_POSTFIX}"
else:
return column_name
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math

from .normalization import NormalizationBaseClass
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F


class NormalizationMean(NormalizationBaseClass):

NORMALIZED_COLUMN_NAME = "mean"

def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
"""
Private method to apply Mean normalization to the specified column.
Mean normalization: (value - mean) / (max - min)
"""
mean_val = df.select(F.mean(F.col(column))).collect()[0][0]
min_val = df.select(F.min(F.col(column))).collect()[0][0]
max_val = df.select(F.max(F.col(column))).collect()[0][0]

divisor = max_val - min_val
if math.isclose(divisor, 0.0, abs_tol=10e-8) or not math.isfinite(divisor):
raise ZeroDivisionError("Division by Zero in Mean")

store_column = self._get_norm_column_name(column)
self.reversal_value = [mean_val, min_val, max_val]

return df.withColumn(
store_column,
(F.col(column) - F.lit(mean_val)) / (F.lit(max_val) - F.lit(min_val)),
)

def _denormalize_column(
self, df: PySparkDataFrame, column: str
) -> PySparkDataFrame:
"""
Private method to revert Mean normalization to the specified column.
Mean denormalization: normalized_value * (max - min) + mean = value
"""
mean_val = self.reversal_value[0]
min_val = self.reversal_value[1]
max_val = self.reversal_value[2]

store_column = self._get_norm_column_name(column)

return df.withColumn(
store_column,
F.col(column) * (F.lit(max_val) - F.lit(min_val)) + F.lit(mean_val),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math

from .normalization import NormalizationBaseClass
from pyspark.sql import DataFrame as PySparkDataFrame
from pyspark.sql import functions as F


class NormalizationMinMax(NormalizationBaseClass):

NORMALIZED_COLUMN_NAME = "minmax"

def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
"""
Private method to revert Min-Max normalization to the specified column.
Min-Max denormalization: normalized_value * (max - min) + min = value
"""
min_val = df.select(F.min(F.col(column))).collect()[0][0]
max_val = df.select(F.max(F.col(column))).collect()[0][0]

divisor = max_val - min_val
if math.isclose(divisor, 0.0, abs_tol=10e-8) or not math.isfinite(divisor):
raise ZeroDivisionError("Division by Zero in MinMax")

store_column = self._get_norm_column_name(column)
self.reversal_value = [min_val, max_val]

return df.withColumn(
store_column,
(F.col(column) - F.lit(min_val)) / (F.lit(max_val) - F.lit(min_val)),
)

def _denormalize_column(
self, df: PySparkDataFrame, column: str
) -> PySparkDataFrame:
"""
Private method to revert Z-Score normalization to the specified column.
Z-Score denormalization: normalized_value * std_dev + mean = value
"""
min_val = self.reversal_value[0]
max_val = self.reversal_value[1]

store_column = self._get_norm_column_name(column)

return df.withColumn(
store_column,
(F.col(column) * (F.lit(max_val) - F.lit(min_val))) + F.lit(min_val),
)
Loading

0 comments on commit a422e72

Please sign in to comment.