-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(KDP): splitting into multiple files
- Loading branch information
1 parent
598e2e7
commit 10772e9
Showing
3 changed files
with
238 additions
and
231 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
import tensorflow as tf | ||
|
||
|
||
class PreprocessorLayerFactory: | ||
@staticmethod | ||
def create_normalization_layer(mean: float, variance: float, name: str) -> tf.keras.layers.Layer: | ||
"""Create a normalization layer. | ||
Args: | ||
mean: The mean of the feature. | ||
variance: The variance of the feature. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.Normalization( | ||
mean=mean, | ||
variance=variance, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_discretization_layer(boundaries: list, name: str) -> tf.keras.layers.Layer: | ||
"""Create a discretization layer. | ||
Args: | ||
boundaries: The boundaries of the buckets. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.Discretization( | ||
bin_boundaries=boundaries, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_embedding_layer(input_dim: int, output_dim: int, name: str) -> tf.keras.layers.Layer: | ||
"""Create an embedding layer. | ||
Args: | ||
input_dim: The input dimension. | ||
output_dim: The output dimension. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.Embedding( | ||
input_dim=input_dim, | ||
output_dim=output_dim, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_category_encoding_layer(num_tokens: int, output_mode: str, name: str) -> tf.keras.layers.Layer: | ||
"""Create a category encoding layer. | ||
Args: | ||
num_tokens: The number of tokens. | ||
output_mode: The output mode. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.CategoryEncoding( | ||
num_tokens=num_tokens, | ||
output_mode=output_mode, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_string_lookup_layer(vocabulary: list[str], num_oov_indices: int, name: str) -> tf.keras.layers.Layer: | ||
"""Create a string lookup layer. | ||
Args: | ||
vocabulary: The vocabulary. | ||
num_oov_indices: The number of out-of-vocabulary indices. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.StringLookup( | ||
vocabulary=vocabulary, | ||
num_oov_indices=num_oov_indices, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_integer_lookup_layer(vocabulary: list[int], num_oov_indices: int, name: str) -> tf.keras.layers.Layer: | ||
"""Create an integer lookup layer. | ||
Args: | ||
vocabulary: The vocabulary. | ||
num_oov_indices: The number of out-of-vocabulary indices. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.IntegerLookup( | ||
vocabulary=vocabulary, | ||
num_oov_indices=num_oov_indices, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_crossing_layer(nr_bins: list, name: str) -> tf.keras.layers.Layer: | ||
"""Create a crossing layer. | ||
Args: | ||
nr_bins: Nr Bins. | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.HashedCrossing( | ||
num_bins=nr_bins, | ||
output_mode="int", | ||
sparse=False, | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_flatten_layer(name="flatten") -> tf.keras.layers.Layer: | ||
"""Create a flatten layer. | ||
Args: | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.Flatten( | ||
name=name, | ||
) | ||
|
||
@staticmethod | ||
def create_concat_layer(name="concat") -> tf.keras.layers.Layer: | ||
"""Create a concatenate layer. | ||
Args: | ||
name: The name of the layer. | ||
""" | ||
return tf.keras.layers.Concatenate( | ||
name=name, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
from collections.abc import Callable | ||
|
||
import tensorflow as tf | ||
from loguru import logger | ||
|
||
|
||
class ProcessingStep: | ||
def __init__(self, layer_creator: Callable[..., tf.keras.layers.Layer], **layer_kwargs) -> None: | ||
"""Initialize a processing step.""" | ||
self.layer = layer_creator(**layer_kwargs) | ||
|
||
def process(self, input_data) -> tf.keras.layers.Layer: | ||
"""Apply the processing step to the input data. | ||
Args: | ||
input_data: The input data to be processed. | ||
""" | ||
return self.layer(input_data) | ||
|
||
def connect(self, input_layer) -> tf.keras.layers.Layer: | ||
"""Connect this step's layer to an input layer and return the output layer.""" | ||
return self.layer(input_layer) | ||
|
||
@property | ||
def name(self) -> object: | ||
"""Return the name of the layer.""" | ||
return self.layer | ||
|
||
|
||
class Pipeline: | ||
def __init__(self, steps: list[ProcessingStep] = None, name: str = "") -> None: | ||
"""Initialize a pipeline with a list of processing steps. | ||
Args: | ||
steps: A list of processing steps. | ||
name: The name of the pipeline. | ||
""" | ||
logger.info(f"🔂 Initializing New Pipeline for: {name}") | ||
self.steps = steps or [] | ||
|
||
def add_step(self, step: ProcessingStep) -> None: | ||
"""Add a processing step to the pipeline. | ||
Args: | ||
step: A processing step. | ||
""" | ||
logger.info(f"Adding new preprocessing layer: {step.name} to the pipeline ➕") | ||
self.steps.append(step) | ||
|
||
def apply(self, input_data) -> tf.data.Dataset: | ||
"""Apply the pipeline to the input data. | ||
Args: | ||
input_data: The input data to be processed. | ||
""" | ||
for step in self.steps: | ||
input_data = step.process(input_data=input_data) | ||
return input_data | ||
|
||
def chain(self, input_layer) -> tf.keras.layers.Layer: | ||
"""Chain the pipeline steps by connecting each step in sequence, starting from the input layer. | ||
Args: | ||
input_layer: The input layer to start the chain. | ||
""" | ||
output_layer = input_layer | ||
for step in self.steps: | ||
output_layer = step.connect(output_layer) | ||
return output_layer | ||
|
||
|
||
class FeaturePreprocessor: | ||
def __init__(self, name: str) -> None: | ||
"""Initialize a feature preprocessor. | ||
Args: | ||
name: The name of the feature preprocessor. | ||
""" | ||
self.name = name | ||
self.pipeline = Pipeline(name=name) | ||
|
||
def add_processing_step(self, layer_creator: Callable[..., tf.keras.layers.Layer], **layer_kwargs) -> None: | ||
"""Add a processing step to the feature preprocessor. | ||
Args: | ||
layer_creator: A callable that creates a Keras layer. | ||
layer_kwargs: Keyword arguments to be passed to the layer creator. | ||
""" | ||
step = ProcessingStep(layer_creator=layer_creator, **layer_kwargs) | ||
self.pipeline.add_step(step=step) | ||
|
||
def preprocess(self, input_data) -> tf.data.Dataset: | ||
"""Apply the feature preprocessor to the input data. | ||
Args: | ||
input_data: The input data to be processed. | ||
""" | ||
return self.pipeline.apply(input_data) | ||
|
||
def chain(self, input_layer) -> tf.keras.layers.Layer: | ||
"""Chain the preprocessor's pipeline steps starting from the input layer. | ||
Args: | ||
input_layer: The input layer to start the chain. | ||
""" | ||
return self.pipeline.chain(input_layer) |
Oops, something went wrong.