Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python]Remove get_artifacts in MLTranform since artifacts are stored in artifact location #29016

Merged
merged 11 commits into from
Oct 25, 2023
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

* Fixed "Desired bundle size 0 bytes must be greater than 0" in Java SDK's BigtableIO.BigtableSource when you have more cores than bytes to read (Java) [#28793](https://github.com/apache/beam/issues/28793).
* `watch_file_pattern` arg of the [RunInference](https://github.com/apache/beam/blob/104c10b3ee536a9a3ea52b4dbf62d86b669da5d9/sdks/python/apache_beam/ml/inference/base.py#L997) arg had no effect prior to 2.52.0. To use the behavior of arg `watch_file_pattern` prior to 2.52.0, follow the documentation at https://beam.apache.org/documentation/ml/side-input-updates/ and use `WatchFilePattern` PTransform as a SideInput. ([#28948](https://github.com/apache/beam/pulls/28948))
* `MLTransform` doesn't output artifacts such as min, max and quantiles. Instead, `MLTransform` will add a feature to output these artifacts as human readable format - [#29017](https://github.com/apache/beam/issues/29017). For now, to use the artifacts such as min and max that were produced by the eariler `MLTransform`, use `read_artifact_location` of `MLTransform`, which reads artifacts that were produced earlier in a different `MLTransform` ([#29016](https://github.com/apache/beam/pull/29016/))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import tensorflow_transform as tft # pylint: disable=unused-import
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_scalar
except ImportError:
raise unittest.SkipTest('tensorflow_transform is not installed.')

Expand All @@ -46,8 +46,8 @@ def check_mltransform_compute_and_apply_vocab():

def check_mltransform_scale_to_0_1():
expected = '''[START mltransform_scale_to_0_1]
Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32))
[END mltransform_scale_to_0_1] '''.splitlines()[1:-1]
return expected

Expand Down Expand Up @@ -80,7 +80,7 @@ def test_mltransform_scale_to_0_1(self, mock_stdout):
self.assertEqual(predicted, expected)

def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout):
mltransform_compute_and_apply_vocabulary_with_non_columnar_data()
mltransform_compute_and_apply_vocabulary_with_scalar()
predicted = mock_stdout.getvalue().splitlines()
expected = check_mltransform_compute_and_apply_vocabulary_with_scalar()
self.assertEqual(predicted, expected)
Expand Down
13 changes: 0 additions & 13 deletions sdks/python/apache_beam/ml/transforms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,13 @@ def apply_transform(self, data: OperationInputT,
inputs: input data.
"""

@abc.abstractmethod
def get_artifacts(
self, data: OperationInputT,
output_column_prefix: str) -> Optional[Dict[str, OperationOutputT]]:
"""
If the operation generates any artifacts, they can be returned from this
method.
"""
pass

def __call__(self, data: OperationInputT,
output_column_name: str) -> Dict[str, OperationOutputT]:
"""
This method is called when the instance of the class is called.
This method will invoke the apply() method of the class.
"""
transformed_data = self.apply_transform(data, output_column_name)
artifacts = self.get_artifacts(data, output_column_name)
if artifacts:
transformed_data = {**transformed_data, **artifacts}
return transformed_data

def get_counter(self):
Expand Down
18 changes: 0 additions & 18 deletions sdks/python/apache_beam/ml/transforms/handlers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ def apply_transform(self, inputs, output_column_name, **kwargs):
return {output_column_name: inputs * 10}


class _FakeOperationWithArtifacts(TFTOperation):
def apply_transform(self, inputs, output_column_name, **kwargs):
return {output_column_name: inputs}

def get_artifacts(self, data, col_name):
return {'artifact': tf.convert_to_tensor([1])}


class IntType(NamedTuple):
x: int

Expand Down Expand Up @@ -106,16 +98,6 @@ def test_tft_operation_preprocessing_fn(
actual_result = process_handler.process_data_fn(inputs)
self.assertDictEqual(actual_result, expected_result)

def test_preprocessing_fn_with_artifacts(self):
process_handler = handlers.TFTProcessHandler(
transforms=[_FakeOperationWithArtifacts(columns=['x'])],
artifact_location=self.artifact_location)
inputs = {'x': [1, 2, 3]}
preprocessing_fn = process_handler.process_data_fn
actual_result = preprocessing_fn(inputs)
expected_result = {'x': [1, 2, 3], 'artifact': tf.convert_to_tensor([1])}
self.assertDictEqual(actual_result, expected_result)

def test_input_type_from_schema_named_tuple_pcoll(self):
data = [{'x': 1}]
with beam.Pipeline() as p:
Expand Down
82 changes: 12 additions & 70 deletions sdks/python/apache_beam/ml/transforms/tft.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
import tensorflow as tf
import tensorflow_transform as tft
from apache_beam.ml.transforms.base import BaseOperation
from tensorflow_transform import analyzers
from tensorflow_transform import common_types
from tensorflow_transform import tf_utils

__all__ = [
'ComputeAndApplyVocabulary',
Expand Down Expand Up @@ -77,6 +75,8 @@ def wrapper(fn):
return wrapper


# TODO: https://github.com/apache/beam/pull/29016
# Add support for outputting artifacts to a text file in human readable form.
class TFTOperation(BaseOperation[common_types.TensorType,
common_types.TensorType]):
def __init__(self, columns: List[str]) -> None:
Expand All @@ -95,13 +95,6 @@ def __init__(self, columns: List[str]) -> None:
"Columns are not specified. Please specify the column for the "
" op %s" % self.__class__.__name__)

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
"""
Returns the artifacts generated by the operation.
"""
return {}

@tf.function
def _split_string_with_delimiter(self, data, delimiter):
"""
Expand Down Expand Up @@ -240,15 +233,6 @@ def apply_transform(
}
return output_dict

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
mean_var = tft.analyzers._mean_and_var(data)
shape = [tf.shape(data)[0], 1]
return {
col_name + '_mean': tf.broadcast_to(mean_var[0], shape),
col_name + '_var': tf.broadcast_to(mean_var[1], shape),
}


@register_input_dtype(float)
class ScaleTo01(TFTOperation):
Expand Down Expand Up @@ -280,14 +264,6 @@ def __init__(
self.elementwise = elementwise
self.name = name

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
shape = [tf.shape(data)[0], 1]
return {
col_name + '_min': tf.broadcast_to(tft.min(data), shape),
col_name + '_max': tf.broadcast_to(tft.max(data), shape)
}

AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
Expand Down Expand Up @@ -368,34 +344,6 @@ def __init__(
self.elementwise = elementwise
self.name = name

def get_artifacts(self, data: common_types.TensorType,
col_name: str) -> Dict[str, common_types.TensorType]:
num_buckets = self.num_buckets
epsilon = self.epsilon
elementwise = self.elementwise

if num_buckets < 1:
raise ValueError('Invalid num_buckets %d' % num_buckets)

if isinstance(data, (tf.SparseTensor, tf.RaggedTensor)) and elementwise:
raise ValueError(
'bucketize requires `x` to be dense if `elementwise=True`')

x_values = tf_utils.get_values(data)

if epsilon is None:
# See explanation in args documentation for epsilon.
epsilon = min(1.0 / num_buckets, 0.01)

quantiles = analyzers.quantiles(
x_values, num_buckets, epsilon, reduce_instance_dims=not elementwise)
shape = [
tf.shape(data)[0], num_buckets - 1 if num_buckets > 1 else num_buckets
]
# These quantiles are used as the bucket boundaries in the later stages.
# Should we change the prefix _quantiles to _bucket_boundaries?
return {col_name + '_quantiles': tf.broadcast_to(quantiles, shape)}

def apply_transform(
self, data: common_types.TensorType,
output_column_name: str) -> Dict[str, common_types.TensorType]:
Expand Down Expand Up @@ -572,6 +520,7 @@ def __init__(
ngram_range: Tuple[int, int] = (1, 1),
ngrams_separator: Optional[str] = None,
compute_word_count: bool = False,
key_vocab_filename: str = 'key_vocab_mapping',
name: Optional[str] = None,
):
"""
Expand All @@ -592,9 +541,9 @@ def __init__(
n-gram sizes.
seperator: A string that will be inserted between each ngram.
compute_word_count: A boolean that specifies whether to compute
the unique word count and add it as an artifact to the output.
Note that the count will be computed over the entire dataset so
it will be the same value for all inputs.
the unique word count over the entire dataset. Defaults to False.
key_vocab_filename: The file name for the key vocabulary file when
compute_word_count is True.
name: A name for the operation (optional).

Note that original order of the input may not be preserved.
Expand All @@ -605,33 +554,26 @@ def __init__(
self.ngrams_separator = ngrams_separator
self.name = name
self.split_string_by_delimiter = split_string_by_delimiter
self.key_vocab_filename = key_vocab_filename
if compute_word_count:
self.compute_word_count_fn = count_unqiue_words
else:
self.compute_word_count_fn = lambda *args, **kwargs: {}
self.compute_word_count_fn = lambda *args, **kwargs: None

if ngram_range != (1, 1) and not ngrams_separator:
raise ValueError(
'ngrams_separator must be specified when ngram_range is not (1, 1)')

def get_artifacts(self, data: tf.SparseTensor,
col_name: str) -> Dict[str, tf.Tensor]:
return self.compute_word_count_fn(data, col_name)

def apply_transform(self, data: tf.SparseTensor, output_col_name: str):
if self.split_string_by_delimiter:
data = self._split_string_with_delimiter(
data, self.split_string_by_delimiter)
output = tft.bag_of_words(
data, self.ngram_range, self.ngrams_separator, self.name)
# word counts are written to the key_vocab_filename
self.compute_word_count_fn(data, self.key_vocab_filename)
return {output_col_name: output}


def count_unqiue_words(data: tf.SparseTensor,
output_col_name: str) -> Dict[str, tf.Tensor]:
keys, count = tft.count_per_key(data)
shape = [tf.shape(data)[0], tf.shape(keys)[0]]
return {
output_col_name + '_unique_elements': tf.broadcast_to(keys, shape),
output_col_name + '_counts': tf.broadcast_to(count, shape)
}
def count_unqiue_words(data: tf.SparseTensor, output_vocab_name: str) -> None:
tft.count_per_key(data, key_vocabulary_filename=output_vocab_name)
Loading