From d9cbc136f0dc282567dea9807d8993953269f44f Mon Sep 17 00:00:00 2001 From: piotrlaczkowski Date: Fri, 19 Apr 2024 15:24:48 +0200 Subject: [PATCH] fix(KDP): adding missing text features vocab attribution --- kdp/custom_layers.py | 1 - kdp/processor.py | 12 ++++++-- kdp/stats.py | 70 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 4 deletions(-) diff --git a/kdp/custom_layers.py b/kdp/custom_layers.py index 05c0d69..2e2a0f1 100644 --- a/kdp/custom_layers.py +++ b/kdp/custom_layers.py @@ -29,7 +29,6 @@ def call(self, x: tf.Tensor) -> tf.Tensor: """ x = tf.strings.lower(x) x = tf.strings.regex_replace(x, f"[{self.punctuation_pattern}]", " ") - x = tf.strings.regex_replace(x, r"[\d+]", " ") stop_words_regex = rf"\b({self.stop_words_pattern})\b\s?" x = tf.strings.regex_replace(x, stop_words_regex, " ") x = tf.strings.regex_replace(x, r"\s+", " ") diff --git a/kdp/processor.py b/kdp/processor.py index 338776c..b38e970 100644 --- a/kdp/processor.py +++ b/kdp/processor.py @@ -181,6 +181,7 @@ def _init_stats(self) -> None: features_specs=self.features_specs, numeric_features=self.numeric_features, categorical_features=self.categorical_features, + text_features=self.text_features, ) self.features_stats = self.stats_instance._load_stats() @@ -404,16 +405,21 @@ def _add_pipeline_categorical(self, feature_name: str, input_layer, stats: dict) # adding outputs self.outputs[feature_name] = preprocessor.chain(input_layer=input_layer) - def _add_pipeline_text(self, feature_name: str, input_layer) -> None: + def _add_pipeline_text(self, feature_name: str, input_layer, stats: dict) -> None: """Add a text preprocessing step to the pipeline. Args: feature_name (str): The name of the feature to be preprocessed. input_layer: The input layer for the feature. + stats (dict): A dictionary containing the metadata of the feature, including """ # getting feature object _feature = self.features_specs[feature_name] + # getting stats + _vocab = stats["vocab"] + logger.debug(f"TEXT: {_vocab = }") + # initializing preprocessor preprocessor = FeaturePreprocessor(name=feature_name) @@ -440,6 +446,7 @@ def _add_pipeline_text(self, feature_name: str, input_layer) -> None: preprocessor.add_processing_step( layer_creator=PreprocessorLayerFactory.text_vectorization_layer, name=f"text_vactorizer_{feature_name}", + vocabulary=_vocab, **_feature.kwargs, ) # for concatenation we need the same format @@ -535,7 +542,7 @@ def build_preprocessor(self) -> tf.keras.Model: stats=stats, ) # CATEGORICAL FEATURES - elif "vocab" in stats: + elif "vocab" in stats and feature_name not in self.text_features: self._add_pipeline_categorical( feature_name=feature_name, input_layer=input_layer, @@ -555,6 +562,7 @@ def build_preprocessor(self) -> tf.keras.Model: self._add_pipeline_text( feature_name=feature_name, input_layer=input_layer, + stats=stats, ) # Preparing outputs diff --git a/kdp/stats.py b/kdp/stats.py index 1c5501e..87c0af5 100644 --- a/kdp/stats.py +++ b/kdp/stats.py @@ -101,6 +101,54 @@ def get_unique_values(self) -> list: return tf.unique(all_values)[0].numpy().tolist() +class TextAccumulator: + def __init__(self) -> None: + """Initializes the accumulator for text values, where each entry is a list of words separated by spaces. + + Attributes: + words (tf.Variable): TensorFlow variable to store unique words as strings. + """ + self.words = tf.Variable( + [], + dtype=tf.string, + shape=tf.TensorShape(None), + trainable=False, + ) + logger.info("TextAccumulator initialized.") + + @tf.function + def update(self, new_texts: tf.Tensor) -> None: + """Updates the accumulator with new text values, extracting words and accumulating unique ones. + + Args: + new_texts: A batch of text values (tf.Tensor of dtype tf.string), + each entry containing words separated by spaces. + + Raises: + ValueError: If the input tensor is not of dtype tf.string. + """ + if new_texts.dtype != tf.string: + raise ValueError(f"Unsupported data type for text features: {new_texts.dtype}") + + # Split each string into words and flatten the list + new_texts = tf.strings.regex_replace(new_texts, r"\s+", " ") + split_words = tf.strings.split(new_texts).flat_values + split_words = tf.strings.lower(split_words) + + # Concatenate new words with existing words and update unique words + updated_words = tf.unique(tf.concat([self.words, split_words], axis=0))[0] + self.words.assign(updated_words) + + def get_unique_words(self) -> list: + """Returns the unique words accumulated so far as a list of strings. + + Returns: + list of str: Unique words accumulated. + """ + unique_words = self.words.value().numpy().tolist() + return unique_words + + class DatasetStatistics: def __init__( self, @@ -108,6 +156,7 @@ def __init__( features_specs: dict[str, FeatureType | str] = None, numeric_features: list[NumericalFeature] = None, categorical_features: list[CategoricalFeature] = None, + text_features: list[CategoricalFeature] = None, features_stats_path: Path = None, overwrite_stats: bool = False, batch_size: int = 50_000, @@ -124,10 +173,12 @@ def __init__( Easier alternative to proviginh numerical and categorical lists. numeric_features: A list of numerical features to calculate statistics for (defaults to None). categorical_features: A list of categorical features to calculate statistics for (defaults to None). + text_features: A list of text features to calculate statistics for (defaults to None). """ self.path_data = path_data self.numeric_features = numeric_features or [] self.categorical_features = categorical_features or [] + self.text_features = text_features or [] self.features_specs = features_specs or {} self.features_stats_path = features_stats_path or "features_stats.json" self.overwrite_stats = overwrite_stats @@ -139,6 +190,7 @@ def __init__( # Initializing placeholders for statistics self.numeric_stats = {col: WelfordAccumulator() for col in self.numeric_features} self.categorical_stats = {col: CategoricalAccumulator() for col in self.categorical_features} + self.text_stats = {col: TextAccumulator() for col in self.text_features} def _get_csv_file_pattern(self, path) -> str: """Get the csv file pattern that will handle directories and file paths. @@ -187,10 +239,17 @@ def _process_batch(self, batch: tf.Tensor) -> None: for feature in self.categorical_features: self.categorical_stats[feature].update(batch[feature]) + for feature in self.text_features: + self.text_stats[feature].update(batch[feature]) + def _compute_final_statistics(self) -> dict[str, dict]: """Compute final statistics for numeric and categorical features.""" logger.info("Computing final statistics for numeric and categorical features 📊") - final_stats = {"numeric_stats": {}, "categorical_stats": {}} + final_stats = { + "numeric_stats": {}, + "categorical_stats": {}, + "text_stats": {}, + } for feature in self.numeric_features: logger.debug(f"numeric {feature =}") final_stats["numeric_stats"][feature] = { @@ -219,6 +278,15 @@ def _compute_final_statistics(self) -> dict[str, dict]: "dtype": _dtype, } + for feature in self.text_features: + logger.debug(f"text {feature = }, {self.text_stats = }") + unique_words = self.text_stats[feature].get_unique_words() + final_stats["text_stats"][feature] = { + "size": len(unique_words), + "vocab": unique_words, + "dtype": self.features_specs[feature].dtype, + } + return final_stats def calculate_dataset_statistics(self, dataset: tf.data.Dataset) -> dict[str, dict]: