From d37b2c4b7b117929b09383e38489cddaa7dead88 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 16 Oct 2023 15:35:28 -0400 Subject: [PATCH 01/11] Remove get_artifacts for MLTransform --- sdks/python/apache_beam/ml/transforms/tft.py | 53 +----------------- .../apache_beam/ml/transforms/tft_test.py | 54 ++++++++----------- 2 files changed, 24 insertions(+), 83 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index 1d492642cd60..f924d70af78f 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -45,9 +45,7 @@ import tensorflow as tf import tensorflow_transform as tft from apache_beam.ml.transforms.base import BaseOperation -from tensorflow_transform import analyzers from tensorflow_transform import common_types -from tensorflow_transform import tf_utils __all__ = [ 'ComputeAndApplyVocabulary', @@ -77,6 +75,8 @@ def wrapper(fn): return wrapper +# TODO: https://github.com/apache/beam/pull/29016 +# Add support for outputting artifacts to a text file in human readable form. class TFTOperation(BaseOperation[common_types.TensorType, common_types.TensorType]): def __init__(self, columns: List[str]) -> None: @@ -240,15 +240,6 @@ def apply_transform( } return output_dict - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - mean_var = tft.analyzers._mean_and_var(data) - shape = [tf.shape(data)[0], 1] - return { - col_name + '_mean': tf.broadcast_to(mean_var[0], shape), - col_name + '_var': tf.broadcast_to(mean_var[1], shape), - } - @register_input_dtype(float) class ScaleTo01(TFTOperation): @@ -280,14 +271,6 @@ def __init__( self.elementwise = elementwise self.name = name - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - shape = [tf.shape(data)[0], 1] - return { - col_name + '_min': tf.broadcast_to(tft.min(data), shape), - col_name + '_max': tf.broadcast_to(tft.max(data), shape) - } - def apply_transform( self, data: common_types.TensorType, output_column_name: str) -> Dict[str, common_types.TensorType]: @@ -368,34 +351,6 @@ def __init__( self.elementwise = elementwise self.name = name - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - num_buckets = self.num_buckets - epsilon = self.epsilon - elementwise = self.elementwise - - if num_buckets < 1: - raise ValueError('Invalid num_buckets %d' % num_buckets) - - if isinstance(data, (tf.SparseTensor, tf.RaggedTensor)) and elementwise: - raise ValueError( - 'bucketize requires `x` to be dense if `elementwise=True`') - - x_values = tf_utils.get_values(data) - - if epsilon is None: - # See explanation in args documentation for epsilon. - epsilon = min(1.0 / num_buckets, 0.01) - - quantiles = analyzers.quantiles( - x_values, num_buckets, epsilon, reduce_instance_dims=not elementwise) - shape = [ - tf.shape(data)[0], num_buckets - 1 if num_buckets > 1 else num_buckets - ] - # These quantiles are used as the bucket boundaries in the later stages. - # Should we change the prefix _quantiles to _bucket_boundaries? - return {col_name + '_quantiles': tf.broadcast_to(quantiles, shape)} - def apply_transform( self, data: common_types.TensorType, output_column_name: str) -> Dict[str, common_types.TensorType]: @@ -614,10 +569,6 @@ def __init__( raise ValueError( 'ngrams_separator must be specified when ngram_range is not (1, 1)') - def get_artifacts(self, data: tf.SparseTensor, - col_name: str) -> Dict[str, tf.Tensor]: - return self.compute_word_count_fn(data, col_name) - def apply_transform(self, data: tf.SparseTensor, output_col_name: str): if self.split_string_by_delimiter: data = self._split_string_with_delimiter( diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py index 41f59c868c3b..dd08811058d3 100644 --- a/sdks/python/apache_beam/ml/transforms/tft_test.py +++ b/sdks/python/apache_beam/ml/transforms/tft_test.py @@ -41,29 +41,6 @@ z_score_expected = {'x_mean': 3.5, 'x_var': 2.9166666666666665} -def assert_z_score_artifacts(element): - element = element.as_dict() - assert 'x_mean' in element - assert 'x_var' in element - assert element['x_mean'] == z_score_expected['x_mean'] - assert element['x_var'] == z_score_expected['x_var'] - - -def assert_ScaleTo01_artifacts(element): - element = element.as_dict() - assert 'x_min' in element - assert 'x_max' in element - assert element['x_min'] == 1 - assert element['x_max'] == 6 - - -def assert_bucketize_artifacts(element): - element = element.as_dict() - assert 'x_quantiles' in element - assert np.array_equal( - element['x_quantiles'], np.array([3, 5], dtype=np.float32)) - - class ScaleZScoreTest(unittest.TestCase): def setUp(self) -> None: self.artifact_location = tempfile.mkdtemp() @@ -99,8 +76,20 @@ def test_z_score(self): | "Create" >> beam.Create(data) | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( - tft.ScaleToZScore(columns=['x']))) - _ = (result | beam.Map(assert_z_score_artifacts)) + tft.ScaleToZScore(columns=['x'])) + | beam.Map(print)) + expected_data = [ + np.array([-1.4638501], dtype=np.float32), + np.array([-0.8783101], dtype=np.float32), + np.array([-0.2927701], dtype=np.float32), + np.array([0.2927701], dtype=np.float32), + np.array([0.8783101], dtype=np.float32), + np.array([1.4638501], dtype=np.float32), + ] + + actual_data = (result | beam.Map(lambda x: x.x)) + assert_that( + actual_data, equal_to(expected_data, equals_fn=np.array_equal)) def test_z_score_list_data(self): list_data = [{'x': [1, 2, 3]}, {'x': [4, 5, 6]}] @@ -111,7 +100,14 @@ def test_z_score_list_data(self): | "listMLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.ScaleToZScore(columns=['x']))) - _ = (list_result | beam.Map(assert_z_score_artifacts)) + + expected_data = [ + np.array([-1.4638501, -0.8783101, -0.2927701], dtype=np.float32), + np.array([0.2927701, 0.8783101, 1.4638501], dtype=np.float32) + ] + actual_data = (list_result | beam.Map(lambda x: x.x)) + assert_that( + actual_data, equal_to(expected_data, equals_fn=np.array_equal)) class ScaleTo01Test(unittest.TestCase): @@ -130,7 +126,6 @@ def test_ScaleTo01_list(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.ScaleTo01(columns=['x']))) - _ = (list_result | beam.Map(assert_ScaleTo01_artifacts)) expected_output = [ np.array([0, 0.2, 0.4], dtype=np.float32), @@ -150,7 +145,6 @@ def test_ScaleTo01(self): write_artifact_location=self.artifact_location).with_transform( tft.ScaleTo01(columns=['x']))) - _ = (result | beam.Map(assert_ScaleTo01_artifacts)) expected_output = ( np.array([0], dtype=np.float32), np.array([0.2], dtype=np.float32), @@ -179,7 +173,6 @@ def test_bucketize(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.Bucketize(columns=['x'], num_buckets=3))) - _ = (result | beam.Map(assert_bucketize_artifacts)) transformed_data = (result | beam.Map(lambda x: x.x)) expected_data = [ @@ -202,8 +195,6 @@ def test_bucketize_list(self): | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( tft.Bucketize(columns=['x'], num_buckets=3))) - _ = (list_result | beam.Map(assert_bucketize_artifacts)) - transformed_data = ( list_result | "TransformedColumnX" >> beam.Map(lambda ele: ele.x)) @@ -751,7 +742,6 @@ def map_element_to_count(elements, counts): transforms=[ tft.BagOfWords(columns=['x'], compute_word_count=True) ])) - # the unique elements and counts are artifacts and will be # stored in the result and same for all the elements in the # PCollection. From e75060736f9892daa0830df2146aabea5098dbbb Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 16 Oct 2023 18:47:56 +0000 Subject: [PATCH 02/11] Doc changes --- .../snippets/transforms/elementwise/mltransform_test.py | 8 ++++---- .../site/content/en/documentation/ml/preprocess-data.md | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py index 1d2197e35e4e..0db10718295b 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py @@ -31,7 +31,7 @@ import tensorflow_transform as tft # pylint: disable=unused-import from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1 from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary - from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data + from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_scalar except ImportError: raise unittest.SkipTest('tensorflow_transform is not installed.') @@ -46,8 +46,8 @@ def check_mltransform_compute_and_apply_vocab(): def check_mltransform_scale_to_0_1(): expected = '''[START mltransform_scale_to_0_1] -Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32)) -Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32)) +Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32)) +Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32)) [END mltransform_scale_to_0_1] '''.splitlines()[1:-1] return expected @@ -80,7 +80,7 @@ def test_mltransform_scale_to_0_1(self, mock_stdout): self.assertEqual(predicted, expected) def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout): - mltransform_compute_and_apply_vocabulary_with_non_columnar_data() + mltransform_compute_and_apply_vocabulary_with_scalar() predicted = mock_stdout.getvalue().splitlines() expected = check_mltransform_compute_and_apply_vocabulary_with_scalar() self.assertEqual(predicted, expected) diff --git a/website/www/site/content/en/documentation/ml/preprocess-data.md b/website/www/site/content/en/documentation/ml/preprocess-data.md index cb79afff6036..2b291b9c75a5 100644 --- a/website/www/site/content/en/documentation/ml/preprocess-data.md +++ b/website/www/site/content/en/documentation/ml/preprocess-data.md @@ -105,7 +105,7 @@ artifacts. When you use the `write_artifact_location` parameter, the `MLTransform` class runs the specified transformations on the dataset and then creates artifacts from these transformations. The artifacts are stored in the location that you specify in -the `write_artifact_location` parameter or in the `MLTransform` output. +the `write_artifact_location` parameter. Write mode is useful when you want to store the results of your transformations for future use. For example, if you apply the same transformations on a @@ -120,8 +120,7 @@ The following examples demonstrate how write mode works. The `ComputeAndApplyVocabulary` transform outputs the indices of the vocabulary to the vocabulary file. - The `ScaleToZScore` transform calculates the mean and variance over the entire dataset - and then normalizes the entire dataset using the mean and variance. The - mean and variance are outputted by the `MLTransform` operation. + and then normalizes the entire dataset using the mean and variance. When you use the `write_artifact_location` parameter, these values are stored as a `tensorflow` graph in the location specified by the `write_artifact_location` parameter value. You can reuse the values in read mode From 0fab242375a501324dcb1494ed79115ca709b7ba Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 16 Oct 2023 19:38:05 +0000 Subject: [PATCH 03/11] Add snippet tests to MLTransform test suite --- sdks/python/tox.ini | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 1e797d96074f..e4cf09cacba4 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -162,7 +162,7 @@ deps = holdup==1.8.0 extras = gcp -allowlist_externals = +allowlist_externals = bash echo sleep @@ -194,7 +194,7 @@ deps = extras = azure passenv = REQUESTS_CA_BUNDLE -allowlist_externals = +allowlist_externals = wget az bash @@ -311,11 +311,12 @@ commands = # Run all DataFrame API unit tests bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe' -[testenv:py{38,39}-tft-113] +[testenv:py{38,39}-tft-{113,114}] deps = 113: tensorflow_transform>=1.13.0,<1.14.0 + 114: tensorflow_transform>=1.14.0,<1.15.0 commands = - bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms' + bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/ml/transforms apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py' [testenv:py{38,39,310,311}-pytorch-{19,110,111,112,113}] deps = From a5e04464faf01e3904e79524edef1143b32c5927 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 17 Oct 2023 15:36:16 -0400 Subject: [PATCH 04/11] Fix tests --- sdks/python/apache_beam/ml/transforms/tft.py | 4 ++ .../apache_beam/ml/transforms/tft_test.py | 37 ++----------------- 2 files changed, 7 insertions(+), 34 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index f924d70af78f..d8ac86367886 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -577,6 +577,10 @@ def apply_transform(self, data: tf.SparseTensor, output_col_name: str): data, self.ngram_range, self.ngrams_separator, self.name) return {output_col_name: output} + def get_artifacts(self, data: tf.SparseTensor, + col_name: str) -> Dict[str, tf.Tensor]: + return self.compute_word_count_fn(data, col_name) + def count_unqiue_words(data: tf.SparseTensor, output_col_name: str) -> Dict[str, tf.Tensor]: diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py index dd08811058d3..57878b817052 100644 --- a/sdks/python/apache_beam/ml/transforms/tft_test.py +++ b/sdks/python/apache_beam/ml/transforms/tft_test.py @@ -76,8 +76,7 @@ def test_z_score(self): | "Create" >> beam.Create(data) | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location).with_transform( - tft.ScaleToZScore(columns=['x'])) - | beam.Map(print)) + tft.ScaleToZScore(columns=['x']))) expected_data = [ np.array([-1.4638501], dtype=np.float32), np.array([-0.8783101], dtype=np.float32), @@ -102,8 +101,8 @@ def test_z_score_list_data(self): tft.ScaleToZScore(columns=['x']))) expected_data = [ - np.array([-1.4638501, -0.8783101, -0.2927701], dtype=np.float32), - np.array([0.2927701, 0.8783101, 1.4638501], dtype=np.float32) + np.array([-1.46385, -0.87831, -0.29277], dtype=np.float32), + np.array([0.29277, 0.87831, 1.46385], dtype=np.float32) ] actual_data = (list_result | beam.Map(lambda x: x.x)) assert_that( @@ -205,36 +204,6 @@ def test_bucketize_list(self): assert_that( transformed_data, equal_to(expected_data, equals_fn=np.array_equal)) - @parameterized.expand([ - (range(1, 10), [4, 7]), - (range(9, 0, -1), [4, 7]), - (range(19, 0, -1), [10]), - (range(1, 100), [25, 50, 75]), - # similar to the above but with odd number of elements - (range(1, 100, 2), [25, 51, 75]), - (range(99, 0, -1), range(10, 100, 10)) - ]) - def test_bucketize_boundaries(self, test_input, expected_boundaries): - # boundaries are outputted as artifacts for the Bucketize transform. - data = [{'x': [i]} for i in test_input] - num_buckets = len(expected_boundaries) + 1 - with beam.Pipeline() as p: - result = ( - p - | "Create" >> beam.Create(data) - | "MLTransform" >> base.MLTransform( - write_artifact_location=self.artifact_location).with_transform( - tft.Bucketize(columns=['x'], num_buckets=num_buckets))) - actual_boundaries = ( - result - | beam.Map(lambda x: x.as_dict()) - | beam.Map(lambda x: x['x_quantiles'])) - - def assert_boundaries(actual_boundaries): - assert np.array_equal(actual_boundaries, expected_boundaries) - - _ = (actual_boundaries | beam.Map(assert_boundaries)) - class ApplyBucketsTest(unittest.TestCase): def setUp(self) -> None: From f3415b425fc99774ecb3682c903b4c78344050b6 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 17 Oct 2023 19:59:18 +0000 Subject: [PATCH 05/11] fix test values --- sdks/python/apache_beam/ml/transforms/tft_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py index 57878b817052..ad7aa647a984 100644 --- a/sdks/python/apache_beam/ml/transforms/tft_test.py +++ b/sdks/python/apache_beam/ml/transforms/tft_test.py @@ -78,12 +78,12 @@ def test_z_score(self): write_artifact_location=self.artifact_location).with_transform( tft.ScaleToZScore(columns=['x']))) expected_data = [ - np.array([-1.4638501], dtype=np.float32), - np.array([-0.8783101], dtype=np.float32), - np.array([-0.2927701], dtype=np.float32), - np.array([0.2927701], dtype=np.float32), - np.array([0.8783101], dtype=np.float32), - np.array([1.4638501], dtype=np.float32), + np.array([-1.46385], dtype=np.float32), + np.array([-0.87831], dtype=np.float32), + np.array([-0.29277], dtype=np.float32), + np.array([0.29277], dtype=np.float32), + np.array([0.87831], dtype=np.float32), + np.array([1.46385], dtype=np.float32), ] actual_data = (result | beam.Map(lambda x: x.x)) From 5532db82b35c38b41cdb47b1575539f1ebb877ad Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Wed, 18 Oct 2023 13:44:10 -0400 Subject: [PATCH 06/11] Update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 5db1b11c595a..ef9cde03d474 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ * Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's BigtableIO.BigtableSource when you have more cores than bytes to read (Java) [#28793](https://github.com/apache/beam/issues/28793). * `watch_file_pattern` arg of the [RunInference](https://github.com/apache/beam/blob/104c10b3ee536a9a3ea52b4dbf62d86b669da5d9/sdks/python/apache_beam/ml/inference/base.py#L997) arg had no effect prior to 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0, follow the documentation at https://beam.apache.org/documentation/ml/side-input-updates/ and use `WatchFilePattern` PTransform as a SideInput. ([#28948](https://github.com/apache/beam/pulls/28948)) +* `MLTransform` doesn't output artifacts such as min, max and quantiles. Instead, `MLTransform` will add a feature to output these artifacts as human readable format - [#29017](https://github.com/apache/beam/issues/29017). For now, to use the artifacts such as min and max that were produced by the eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which reads artifacts that were produced earlier in a different `MLTransform` ([#29016](https://github.com/apache/beam/pull/29016/)) ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From 49fd346ea0135e740eb6f9160ec5fee13cf5f22b Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 23 Oct 2023 15:11:27 +0000 Subject: [PATCH 07/11] add word count to file instead of dict --- sdks/python/apache_beam/ml/transforms/tft.py | 23 ++++------ .../apache_beam/ml/transforms/tft_test.py | 44 ++++++++++++------- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index d8ac86367886..8c5c86ca5e22 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -527,6 +527,7 @@ def __init__( ngram_range: Tuple[int, int] = (1, 1), ngrams_separator: Optional[str] = None, compute_word_count: bool = False, + key_vocab_filename: str = 'key_vocab_mapping', name: Optional[str] = None, ): """ @@ -547,9 +548,9 @@ def __init__( n-gram sizes. seperator: A string that will be inserted between each ngram. compute_word_count: A boolean that specifies whether to compute - the unique word count and add it as an artifact to the output. - Note that the count will be computed over the entire dataset so - it will be the same value for all inputs. + the unique word count over the entire dataset. Defaults to False. + key_vocab_filename: The file name for the key vocabulary file when + compute_word_count is True. name: A name for the operation (optional). Note that original order of the input may not be preserved. @@ -560,6 +561,7 @@ def __init__( self.ngrams_separator = ngrams_separator self.name = name self.split_string_by_delimiter = split_string_by_delimiter + self.key_vocab_filename = key_vocab_filename if compute_word_count: self.compute_word_count_fn = count_unqiue_words else: @@ -575,18 +577,11 @@ def apply_transform(self, data: tf.SparseTensor, output_col_name: str): data, self.split_string_by_delimiter) output = tft.bag_of_words( data, self.ngram_range, self.ngrams_separator, self.name) + # word counts are written to the key_vocab_filename + self.compute_word_count_fn(data, self.key_vocab_filename) return {output_col_name: output} - def get_artifacts(self, data: tf.SparseTensor, - col_name: str) -> Dict[str, tf.Tensor]: - return self.compute_word_count_fn(data, col_name) - def count_unqiue_words(data: tf.SparseTensor, - output_col_name: str) -> Dict[str, tf.Tensor]: - keys, count = tft.count_per_key(data) - shape = [tf.shape(data)[0], tf.shape(keys)[0]] - return { - output_col_name + '_unique_elements': tf.broadcast_to(keys, shape), - output_col_name + '_counts': tf.broadcast_to(count, shape) - } + output_vocab_name: str) -> Dict[str, tf.Tensor]: + tft.count_per_key(data, key_vocabulary_filename=output_vocab_name) diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py index ad7aa647a984..a7b6cc733adc 100644 --- a/sdks/python/apache_beam/ml/transforms/tft_test.py +++ b/sdks/python/apache_beam/ml/transforms/tft_test.py @@ -38,8 +38,6 @@ if not tft: raise unittest.SkipTest('tensorflow_transform is not installed.') -z_score_expected = {'x_mean': 3.5, 'x_var': 2.9166666666666665} - class ScaleZScoreTest(unittest.TestCase): def setUp(self) -> None: @@ -576,6 +574,17 @@ def setUp(self) -> None: def tearDown(self): shutil.rmtree(self.artifact_location) + def validate_count_per_key(self): + import os + key_vocab_location = os.path.join( + self.artifact_location, 'transform_fn/assets/key_vocab') + with open(key_vocab_location, 'r'): + key_vocab_list = [line.strip() for line in f] + + expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana'] + actual_data = key_vocab_list + self.assertEqual(expected_data, actual_data) + def test_bag_of_words_on_list_seperated_words_default_ngrams(self): data = [{ 'x': ['I', 'like', 'pie', 'pie', 'pie'], @@ -691,10 +700,6 @@ def test_bag_of_words_on_by_splitting_input_text(self): assert_that(result, equal_to(expected_data, equals_fn=np.array_equal)) def test_count_per_key_on_list(self): - def map_element_to_count(elements, counts): - d = {elements[i]: counts[i] for i in range(len(elements))} - return d - data = [{ 'x': ['I', 'like', 'pie', 'pie', 'pie'], }, { @@ -703,24 +708,29 @@ def map_element_to_count(elements, counts): 'x': ['Banana', 'Banana', 'Apple', 'Apple', 'Apple', 'Apple'] }] with beam.Pipeline() as p: - result = ( + _ = ( p | "Create" >> beam.Create(data) | "MLTransform" >> base.MLTransform( write_artifact_location=self.artifact_location, transforms=[ - tft.BagOfWords(columns=['x'], compute_word_count=True) + tft.BagOfWords( + columns=['x'], + compute_word_count=True, + key_vocab_filename='my_vocab') ])) - # the unique elements and counts are artifacts and will be - # stored in the result and same for all the elements in the - # PCollection. - result = result | beam.Map( - lambda x: map_element_to_count(x.x_unique_elements, x.x_counts)) - expected_data = [{ - b'Apple': 4, b'Banana': 2, b'I': 1, b'like': 1, b'pie': 4, b'yum': 2 - }] * 3 # since there are 3 elements in input. - assert_that(result, equal_to(expected_data)) + def validate_count_per_key(key_vocab_filename): + import os + key_vocab_location = os.path.join( + self.artifact_location, 'transform_fn/assets', key_vocab_filename) + with open(key_vocab_location, 'r') as f: + key_vocab_list = [line.strip() for line in f] + return key_vocab_list + + expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana'] + actual_data = validate_count_per_key('my_vocab') + self.assertEqual(expected_data, actual_data) if __name__ == '__main__': From 6d78d6c1506ae68021f30e44f1245160ffde678c Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Mon, 23 Oct 2023 15:13:43 +0000 Subject: [PATCH 08/11] remove get_artifacts method --- sdks/python/apache_beam/ml/transforms/base.py | 13 ------------- .../apache_beam/ml/transforms/handlers_test.py | 18 ------------------ sdks/python/apache_beam/ml/transforms/tft.py | 7 ------- 3 files changed, 38 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/base.py b/sdks/python/apache_beam/ml/transforms/base.py index 49ce6e9ec1e0..dc70aebf93fd 100644 --- a/sdks/python/apache_beam/ml/transforms/base.py +++ b/sdks/python/apache_beam/ml/transforms/base.py @@ -67,16 +67,6 @@ def apply_transform(self, data: OperationInputT, inputs: input data. """ - @abc.abstractmethod - def get_artifacts( - self, data: OperationInputT, - output_column_prefix: str) -> Optional[Dict[str, OperationOutputT]]: - """ - If the operation generates any artifacts, they can be returned from this - method. - """ - pass - def __call__(self, data: OperationInputT, output_column_name: str) -> Dict[str, OperationOutputT]: """ @@ -84,9 +74,6 @@ def __call__(self, data: OperationInputT, This method will invoke the apply() method of the class. """ transformed_data = self.apply_transform(data, output_column_name) - artifacts = self.get_artifacts(data, output_column_name) - if artifacts: - transformed_data = {**transformed_data, **artifacts} return transformed_data def get_counter(self): diff --git a/sdks/python/apache_beam/ml/transforms/handlers_test.py b/sdks/python/apache_beam/ml/transforms/handlers_test.py index 3342ec76cae5..327c8c76c0e9 100644 --- a/sdks/python/apache_beam/ml/transforms/handlers_test.py +++ b/sdks/python/apache_beam/ml/transforms/handlers_test.py @@ -58,14 +58,6 @@ def apply_transform(self, inputs, output_column_name, **kwargs): return {output_column_name: inputs * 10} -class _FakeOperationWithArtifacts(TFTOperation): - def apply_transform(self, inputs, output_column_name, **kwargs): - return {output_column_name: inputs} - - def get_artifacts(self, data, col_name): - return {'artifact': tf.convert_to_tensor([1])} - - class IntType(NamedTuple): x: int @@ -106,16 +98,6 @@ def test_tft_operation_preprocessing_fn( actual_result = process_handler.process_data_fn(inputs) self.assertDictEqual(actual_result, expected_result) - def test_preprocessing_fn_with_artifacts(self): - process_handler = handlers.TFTProcessHandler( - transforms=[_FakeOperationWithArtifacts(columns=['x'])], - artifact_location=self.artifact_location) - inputs = {'x': [1, 2, 3]} - preprocessing_fn = process_handler.process_data_fn - actual_result = preprocessing_fn(inputs) - expected_result = {'x': [1, 2, 3], 'artifact': tf.convert_to_tensor([1])} - self.assertDictEqual(actual_result, expected_result) - def test_input_type_from_schema_named_tuple_pcoll(self): data = [{'x': 1}] with beam.Pipeline() as p: diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index 8c5c86ca5e22..857f40e5fd98 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -95,13 +95,6 @@ def __init__(self, columns: List[str]) -> None: "Columns are not specified. Please specify the column for the " " op %s" % self.__class__.__name__) - def get_artifacts(self, data: common_types.TensorType, - col_name: str) -> Dict[str, common_types.TensorType]: - """ - Returns the artifacts generated by the operation. - """ - return {} - @tf.function def _split_string_with_delimiter(self, data, delimiter): """ From 4262dd0f58504171dea6beecc388108c096e2742 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 24 Oct 2023 09:53:47 -0400 Subject: [PATCH 09/11] Remove redundant code --- sdks/python/apache_beam/ml/transforms/tft_test.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft_test.py b/sdks/python/apache_beam/ml/transforms/tft_test.py index a7b6cc733adc..38ded6a809af 100644 --- a/sdks/python/apache_beam/ml/transforms/tft_test.py +++ b/sdks/python/apache_beam/ml/transforms/tft_test.py @@ -17,6 +17,7 @@ # pytype: skip-file +import os import shutil import tempfile import unittest @@ -574,17 +575,6 @@ def setUp(self) -> None: def tearDown(self): shutil.rmtree(self.artifact_location) - def validate_count_per_key(self): - import os - key_vocab_location = os.path.join( - self.artifact_location, 'transform_fn/assets/key_vocab') - with open(key_vocab_location, 'r'): - key_vocab_list = [line.strip() for line in f] - - expected_data = ['2 yum', '4 Apple', '1 like', '1 I', '4 pie', '2 Banana'] - actual_data = key_vocab_list - self.assertEqual(expected_data, actual_data) - def test_bag_of_words_on_list_seperated_words_default_ngrams(self): data = [{ 'x': ['I', 'like', 'pie', 'pie', 'pie'], @@ -721,7 +711,6 @@ def test_count_per_key_on_list(self): ])) def validate_count_per_key(key_vocab_filename): - import os key_vocab_location = os.path.join( self.artifact_location, 'transform_fn/assets', key_vocab_filename) with open(key_vocab_location, 'r') as f: From dd0e7ea66ec028edf1e36ac802b922bfc64babd4 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Tue, 24 Oct 2023 10:05:31 -0400 Subject: [PATCH 10/11] Change return type --- sdks/python/apache_beam/ml/transforms/tft.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index 857f40e5fd98..a2bad65deb47 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -575,6 +575,5 @@ def apply_transform(self, data: tf.SparseTensor, output_col_name: str): return {output_col_name: output} -def count_unqiue_words(data: tf.SparseTensor, - output_vocab_name: str) -> Dict[str, tf.Tensor]: +def count_unqiue_words(data: tf.SparseTensor, output_vocab_name: str) -> None: tft.count_per_key(data, key_vocabulary_filename=output_vocab_name) From 6472ba7b3968427c50003bfc21ec425af4f608ae Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Tue, 24 Oct 2023 15:38:27 -0400 Subject: [PATCH 11/11] Update tft.py --- sdks/python/apache_beam/ml/transforms/tft.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/transforms/tft.py b/sdks/python/apache_beam/ml/transforms/tft.py index a2bad65deb47..c7b8ff015324 100644 --- a/sdks/python/apache_beam/ml/transforms/tft.py +++ b/sdks/python/apache_beam/ml/transforms/tft.py @@ -558,7 +558,7 @@ def __init__( if compute_word_count: self.compute_word_count_fn = count_unqiue_words else: - self.compute_word_count_fn = lambda *args, **kwargs: {} + self.compute_word_count_fn = lambda *args, **kwargs: None if ngram_range != (1, 1) and not ngrams_separator: raise ValueError(