Skip to content

Commit

Permalink
feat(kdp): adding date preprocessing layers
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrlaczkowski committed Aug 19, 2024
1 parent 23eca47 commit 552f26b
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 124 deletions.
23 changes: 22 additions & 1 deletion docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,27 @@ Example cross feature between INTEGER_CATEGORICAL and STRING_CATEGORICAL:

![Cross Features Pipeline](imgs/cross_features.png)

## 📆 Date Features

You can even process string encoded date features (format: 'YYYY-MM-DD'):

```python
from kdp.processor import PreprocessingModel

ppr = PreprocessingModel(
path_data="data/data.csv",
features_specs={
"feat1": FeatureType.FLOAT,
"feat2": FeatureType.DATE,
},
)
```

Example date and numeric processing pipeline:

![Date Features Pipeline](imgs/date_features.png)


## 🚀 Custom Preprocessing Steps

If you require even more customization, you can define custom preprocessing steps using the `Feature` class, using `preprocessors` attribute.
Expand Down Expand Up @@ -233,4 +254,4 @@ Here's how the text feature preprocessing pipeline looks:

![Text Feature Pipeline](imgs/custom_feature_pipeline.png)

The full list of availble layers can be found: [Preprocessing Layers Factory](layers_factory.md)
The full list of available layers can be found: [Preprocessing Layers Factory](layers_factory.md)
Binary file added docs/imgs/date_features.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/layers_factory.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 🏭 Preprocessing Layers Factory

You can find all availble layers in the `PreprocessorLayerFactory` class:
You can find all available layers in the `PreprocessorLayerFactory` class:

::: kdp.layers_factory.PreprocessorLayerFactory
handler: python
Expand Down
6 changes: 3 additions & 3 deletions docs/transformer_blocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ with the following arguments:

- `transfo_placement` (str): The placement of the transformer block withe the following options:
- `CATEGORICAL` -> only after categorical and text variables
- `ALL_FEATURES` -> after all concatenaded features).
- `ALL_FEATURES` -> after all concatenated features).


This used a dedicated TransformerBlockLayer to handle the transformer block logic.
Expand All @@ -25,9 +25,9 @@ This used a dedicated TransformerBlockLayer to handle the transformer block logi
from kdp.processor import PreprocessingModel, OutputModeOptions, TransformerBlockPlacementOptions

ppr = PreprocessingModel(
path_data="data/test_saad.csv",
path_data="data/test_data.csv",
features_specs=features_specs,
features_stats_path="stats_saad.json",
features_stats_path="stats_data.json",
output_mode=OutputModeOptions.CONCAT,
# TRANSFORMERS BLOCK CONTROLL
transfo_nr_blocks=3, # if 0, transformer block is disabled
Expand Down
177 changes: 60 additions & 117 deletions kdp/custom_layers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import re
import string

Expand Down Expand Up @@ -84,56 +85,28 @@ def call(self, inputs: tf.Tensor) -> tf.Tensor:


class DateParsingLayer(tf.keras.layers.Layer):
"""A Keras Layer that parses a date string and extracts features: year, month, and day of the week.
This layer assumes the input tensor contains date strings in the format 'YYYY-MM-DD' or 'DD-MM-YYYY',
and converts these strings into numerical features suitable for further processing.
Required Input Format:
- A tensor of shape [batch_size, 1], where each row contains a date string.
Args:
date_format (str): The format of the date string. Options are 'YYYY-MM-DD' or 'DD-MM-YYYY'.
Methods:
call(inputs): Parses the date strings and extracts features including year, month, day_of_week.
get_config(): Returns the configuration of the layer as a dictionary.
from_config(config): Instantiates a DateParsingLayer from its configuration dictionary.
"""

def __init__(self, date_format: str = "YYYY-MM-DD", **kwargs):
def __init__(self, date_format: str = "YYYY-MM-DD", **kwargs) -> None:
"""Initializing DateParsingLayer.
Args:
date_format (str, optional): Date formats that layer accepts. Defaults to 'YYYY-MM-DD'.
**kwargs (dict): additional parameters.
date_format (str): format of the string encoded date to parse.
kwargs (dict): other params to pass to the class.
"""
super().__init__(**kwargs)
self.date_format = date_format

@tf.function
def call(self, inputs: tf.Tensor) -> tf.Tensor:
"""Parses the date strings and extracts features.
"""Base forward pass definition.
Args:
inputs (tf.Tensor): A tensor of shape [batch_size, 1] where each row contains a date string.
inputs (tf.Tensor): Tensor with input data.
Returns:
tf.Tensor: A tensor of shape [batch_size, 3] with extracted features: year, month, day_of_week.
Raises:
ValueError: If the date_format is not recognized or if the input tensor does not have the correct shape.
tf.Tensor: processed date tensor with all cyclic components.
"""

def parse_date(date_str: str) -> tuple:
"""Parsing date into a stacked tensor.
Args:
date_str (str): date to be parsed.
Returns:
tuple (tf.tensor): representing stacked year, month and day.
"""
def parse_date(date_str: str) -> tf.Tensor:
parts = tf.strings.split(date_str, "-")
year = tf.strings.to_number(parts[0], out_type=tf.int32)
month = tf.strings.to_number(parts[1], out_type=tf.int32)
Expand All @@ -150,105 +123,90 @@ def parse_date(date_str: str) -> tuple:
return tf.stack([year, month, day_of_week])

parsed_dates = tf.map_fn(parse_date, tf.squeeze(inputs), fn_output_signature=tf.int32)
return tf.expand_dims(parsed_dates, axis=-1)

def compute_output_shape(self, input_shape) -> tf.TensorShape:
"""Computing tensor shape.
return parsed_dates

Args:
input_shape (_type_): initial tensor shape.
Returns:
_type_ (tf.TensorShape): return tensor shape.
"""
return tf.TensorShape([input_shape[0], input_shape[1], 3, 1])
def compute_output_shape(self, input_shape: int) -> int:
"""Getting output shape."""
return tf.TensorShape([input_shape[0], 3])

def get_config(self) -> dict:
"""Returns the configuration of the layer as a dictionary.
Returns:
dict: The configuration dictionary.
"""
"""Saving configuration."""
config = super().get_config()
config.update({"date_format": self.date_format})
return config

@classmethod
def from_config(cls, config: dict) -> object:
"""Instantiates a DateParsingLayer from its configuration dictionary.
Args:
config (dict): The configuration dictionary.
Returns:
object: The DateParsingLayer instance.
"""
"""Restoring configuration."""
return cls(**config)


class DateEncodingLayer(tf.keras.layers.Layer):
"""A Keras Layer that performs date feature encoding, including cyclical encoding for month and day of the week.
This layer extracts the year, month, and day of the week from the input tensor, and applies cyclical encoding
to the month and day of the week. The cyclical encoding helps the model learn cyclical patterns in these features.
def __init__(self, **kwargs):
"""Initializing DateEncodingLayer."""
super().__init__(**kwargs)

Required Input Format:
- A tensor of shape [batch_size, 3], where each row contains:
- year (int): Year as a numerical value.
- month (int): Month as an integer from 1 to 12.
- day_of_week (int): Day of the week as an integer from 0 to 6 (where 0=Monday).
@tf.function
def normalize_year(self, year: tf.Tensor) -> tf.Tensor:
"""Normalize the year to a fractional year value (0-1)."""
# Example: year could be something like 2023.5 representing mid-2023
return year % 1.0

Args:
**kwargs: Additional keyword arguments for the Keras Layer.
@tf.function
def cyclic_encoding(self, value: tf.Tensor, period: float) -> tuple[tf.Tensor, tf.Tensor]:
"""Encode a value as a cyclical feature using sine and cosine transformations.
Methods:
call(inputs): Applies date feature encoding to the input tensor.
get_config(): Returns the configuration of the layer as a dictionary.
from_config(config): Instantiates a DateEncodingLayer from its configuration dictionary.
"""
Args:
value: A tensor of floats representing the value to be encoded.
period: The period of the cycle (e.g., 12 for months, 7 for days).
def __init__(self, **kwargs):
"""Initializing DateEncodingLayer."""
super().__init__(**kwargs)
Returns:
A tuple (sin_encoded, cos_encoded) representing the cyclical features.
"""
_pi = tf.constant(math.pi)
normalized_value = value / period
sin_component = tf.math.sin(2 * _pi * normalized_value)
cos_component = tf.math.cos(2 * _pi * normalized_value)
return sin_component, cos_component

@tf.function
def call(self, inputs: tf.Tensor) -> tf.Tensor:
"""Applies date feature encoding to the input tensor.
"""Splits the date into 3 components: year, month and day and
encodes it into sin and cos cyclical projections.
Args:
inputs (tf.Tensor): A tensor of shape [batch_size, 3] where each row contains [year, month, day_of_week].
inputs (tf.Tensor): input data.
Returns:
tf.Tensor: A tensor of shape [batch_size, 5] with encoded features including year, month cyclical encoding,
and day of week cyclical encoding.
Raises:
ValueError: If the input tensor does not have shape [batch_size, 3]
or contains invalid month/day_of_week values.
(tf.Tensor): cyclically encoded data (sin and cos).
"""
# Reshape input if necessary
input_shape = tf.shape(inputs)
if len(input_shape) == 4:
if len(input_shape) == 3:
inputs = tf.squeeze(inputs, axis=-1)

# Extract features
year = inputs[..., 0]
month = inputs[..., 1]
day_of_week = inputs[..., 2]
year = inputs[:, 0]
month = inputs[:, 1]
day_of_week = inputs[:, 2]

# Cyclical encoding
year_float = tf.cast(year, tf.float32)
month_float = tf.cast(month, tf.float32)
day_of_week_float = tf.cast(day_of_week, tf.float32)
_pi = tf.const(3.1415)

month_sin = tf.math.sin(2 * _pi * month_float / 12)
month_cos = tf.math.cos(2 * _pi * month_float / 12)
day_of_week_sin = tf.math.sin(2 * _pi * day_of_week_float / 7)
day_of_week_cos = tf.math.cos(2 * _pi * day_of_week_float / 7)
# Ensure inputs are in the correct range
year_float = self.normalize_year(year_float)

# Encode each feature
year_sin, year_cos = self.cyclic_encoding(year_float, period=1.0)
month_sin, month_cos = self.cyclic_encoding(month_float, period=12.0)
day_of_week_sin, day_of_week_cos = self.cyclic_encoding(day_of_week_float, period=7.0)

encoded = tf.stack(
[
tf.cast(year, tf.float32),
year_sin,
year_cos,
month_sin,
month_cos,
day_of_week_sin,
Expand All @@ -257,34 +215,19 @@ def call(self, inputs: tf.Tensor) -> tf.Tensor:
axis=-1,
)

# Reshape to 2D tensor
encoded_flat = tf.reshape(encoded, [-1, 5])

return encoded_flat
return encoded

def compute_output_shape(self, input_shape: int) -> int:
"""COmputing tensor shape."""
return tf.TensorShape([input_shape[0], 5])
"""Getting output shape."""
return tf.TensorShape([input_shape[0], 6])

def get_config(self) -> dict:
"""Returns the configuration of the layer as a dictionary.
Returns:
dict: The configuration dictionary.
"""
config = super().get_config()
return config
"""Returns the configuration of the layer as a dictionary."""
return super().get_config()

@classmethod
def from_config(cls, config: dict) -> object:
"""Instantiates a DateEncodingLayer from its configuration dictionary.
Args:
config (dict): The configuration dictionary.
Returns:
object: The DateEncodingLayer instance.
"""
"""Reloading current configuration."""
return cls(**config)


Expand Down
4 changes: 2 additions & 2 deletions kdp/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ def _add_pipeline_date(self, feature_name: str, input_layer) -> None:
)

# Optionally, add SeasonLayer
if _feature.kwargs.get("add_season", False):
if _feature.kwargs.get("add_season", True):
logger.debug("Adding Season layer")
preprocessor.add_processing_step(
layer_creator=PreprocessorLayerFactory.date_season_layer,
Expand Down Expand Up @@ -715,7 +715,7 @@ def build_preprocessor(self) -> tf.keras.Model:
self._add_pipeline_date(
feature_name=feat_name,
input_layer=input_layer,
stats=stats,
# stats=stats,
)

# Preparing outputs
Expand Down
35 changes: 35 additions & 0 deletions kdp/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,41 @@ def __init__(
self.text_stats = {col: TextAccumulator() for col in self.text_features}
self.date_stats = {col: DateAccumulator() for col in self.date_features}

def _get_csv_file_pattern(self, path) -> str:
"""Get the csv file pattern that will handle directories and file paths.
Args:
path (str): Path to the csv file (can be a directory or a file)
Returns:
str: File pattern that always has *.csv at the end
"""
file_path = Path(path)
# Check if the path is a directory
if file_path.suffix:
# Get the parent directory if the path is a file
base_path = file_path.parent
csv_pattern = base_path / "*.csv"
else:
csv_pattern = file_path / "*.csv"

return str(csv_pattern)

def _read_data_into_dataset(self) -> tf.data.Dataset:
"""Reading CSV files from the provided path into a tf.data.Dataset."""
logger.info(f"Reading CSV data from the corresponding folder: {self.path_data}")
_path_csvs_regex = self._get_csv_file_pattern(path=self.path_data)
self.ds = tf.data.experimental.make_csv_dataset(
file_pattern=_path_csvs_regex,
num_epochs=1,
shuffle=False,
ignore_errors=True,
batch_size=self.batch_size,
)
logger.info(f"DataSet Ready to be used (batched by: {self.batch_size}) ✅")
return self.ds

def _process_batch(self, batch: tf.Tensor) -> None:
"""Update statistics accumulators for each batch.
Expand Down

0 comments on commit 552f26b

Please sign in to comment.