From 017c17b3db269e9693f5e234069873e6e748b0dc Mon Sep 17 00:00:00 2001 From: tfx-bsl-team Date: Wed, 16 Mar 2022 08:19:35 -0700 Subject: [PATCH] Adding test coverage for telemetry functionality without descriptors specified. PiperOrigin-RevId: 435054937 --- tfx_bsl/tfxio/parquet_tfxio.py | 148 -------------- tfx_bsl/tfxio/parquet_tfxio_test.py | 288 ---------------------------- tfx_bsl/tfxio/telemetry_test.py | 55 ++++-- 3 files changed, 36 insertions(+), 455 deletions(-) delete mode 100644 tfx_bsl/tfxio/parquet_tfxio.py delete mode 100644 tfx_bsl/tfxio/parquet_tfxio_test.py diff --git a/tfx_bsl/tfxio/parquet_tfxio.py b/tfx_bsl/tfxio/parquet_tfxio.py deleted file mode 100644 index d4004585..00000000 --- a/tfx_bsl/tfxio/parquet_tfxio.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""TFXIO implementation for Parquet.""" - -import copy -from typing import Optional, List, Text, Any - -import apache_beam as beam -import pyarrow as pa -import pyarrow.parquet as pq -import tensorflow as tf -from apache_beam.io.filesystems import FileSystems -from tensorflow_metadata.proto.v0 import schema_pb2 - -from tfx_bsl.coders import csv_decoder - -from tfx_bsl.tfxio import dataset_options, tensor_adapter, tensor_representation_util, telemetry -from tfx_bsl.tfxio.tfxio import TFXIO - -_PARQUET_FORMAT = "parquet" - - -class ParquetTFXIO(TFXIO): - """TFXIO implementation for Parquet.""" - - def __init__(self, - file_pattern: Text, - column_names: List[Text], - *, - min_bundle_size: int = 0, - schema: Optional[schema_pb2.Schema] = None, - validate: Optional[bool] = True, - telemetry_descriptors: Optional[List[Text]] = None): - """Initializes a Parquet TFXIO. - - Args: - file_pattern: A file glob pattern to read parquet files from. - column_names: List of column names to read from the parquet files. - min_bundle_size: the minimum size in bytes, to be considered when - splitting the parquet input into bundles. - schema: An optional TFMD Schema describing the dataset. If schema is - provided, it will determine the data type of the parquet columns. Otherwise, - the each column's data type will be inferred by the decoder. - validate: Boolean flag to verify that the files exist during the pipeline - creation time. - telemetry_descriptors: A set of descriptors that identify the component - that is instantiating this TFXIO. These will be used to construct the - namespace to contain metrics for profiling and are therefore expected to - be identifiers of the component itself and not individual instances of - source use. - """ - self._file_pattern = file_pattern - self._column_names = column_names - self._min_bundle_size = min_bundle_size - self._validate = validate - self._schema = schema - self._telemetry_descriptors = telemetry_descriptors - - def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform: - - @beam.typehints.with_input_types(Any) - @beam.typehints.with_output_types(pa.RecordBatch) - def _PTransformFn(pcoll_or_pipeline: Any): - """Reads Parquet tables and converts to RecordBatches.""" - return ( - pcoll_or_pipeline | "ParquetBeamSource" >> - beam.io.ReadFromParquetBatched(file_pattern=self._file_pattern, - min_bundle_size=self._min_bundle_size, - validate=self._validate, - columns=self._column_names) | - "ToRecordBatch" >> beam.FlatMap(self._TableToRecordBatch, batch_size) - | "CollectRecordBatchTelemetry" >> telemetry.ProfileRecordBatches( - self._telemetry_descriptors, _PARQUET_FORMAT, _PARQUET_FORMAT)) - - return beam.ptransform_fn(_PTransformFn)() - - def RecordBatches(self, options: dataset_options.RecordBatchesOptions): - raise NotImplementedError - - def TensorFlowDataset( - self, - options: dataset_options.TensorFlowDatasetOptions) -> tf.data.Dataset: - raise NotImplementedError - - def _TableToRecordBatch( - self, - table: pa.Table, - batch_size: Optional[int] = None) -> List[pa.RecordBatch]: - return table.to_batches(max_chunksize=batch_size) - - def ArrowSchema(self) -> pa.Schema: - if self._schema is None: - return self._InferArrowSchema() - return csv_decoder.GetArrowSchema(self._column_names, self._schema) - - def _InferArrowSchema(self): - match_result = FileSystems.match([self._file_pattern])[0] - files_metadata = match_result.metadata_list[0] - with FileSystems.open(files_metadata.path) as f: - return pq.read_schema(f) - - def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations: - result = (tensor_representation_util.GetTensorRepresentationsFromSchema( - self._schema)) - if result is None: - result = (tensor_representation_util.InferTensorRepresentationsFromSchema( - self._schema)) - return result - - def _ProjectTfmdSchema(self, column_names: List[Text]) -> schema_pb2.Schema: - """Creates a tensorflow Schema from the current one with only the given columns""" - - result = schema_pb2.Schema() - result.CopyFrom(self._schema) - - for feature in self._schema.feature: - if feature.name not in column_names: - result.feature.remove(feature) - - return result - - def _ProjectImpl(self, tensor_names: List[Text]) -> "TFXIO": - """Returns a projected TFXIO. - - Projection is pushed down to the Parquet Beam source. - - The Projected TFXIO will project the record batches, arrow schema, - and the tfmd schema. - - Args: - tensor_names: The columns to project. - """ - projected_schema = self._ProjectTfmdSchema(tensor_names) - result = copy.copy(self) - result._column_names = tensor_names # pylint: disable=protected-access - result._schema = projected_schema # pylint: disable=protected-access - return result diff --git a/tfx_bsl/tfxio/parquet_tfxio_test.py b/tfx_bsl/tfxio/parquet_tfxio_test.py deleted file mode 100644 index d19045c3..00000000 --- a/tfx_bsl/tfxio/parquet_tfxio_test.py +++ /dev/null @@ -1,288 +0,0 @@ -import os - -import apache_beam as beam -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import tensorflow as tf -from absl import flags -from absl.testing import absltest -from apache_beam.testing import util as beam_testing_util -from google.protobuf import text_format -from tensorflow_metadata.proto.v0 import schema_pb2 -from tfx_bsl.tfxio import telemetry_test_util - -from tfx_bsl.tfxio.parquet_tfxio import ParquetTFXIO - -FLAGS = flags.FLAGS -_COLUMN_NAMES = ["int_feature", "float_feature", "string_feature"] -_TELEMETRY_DESCRIPTORS = ["Some", "Component"] -_ROWS = { - "int_feature": [[1], [2]], - "float_feature": [[2.0], [3.0]], - "string_feature": [["abc"], ["xyz"]] -} -_NUM_ROWS = len(next(iter(_ROWS))) - -_SCHEMA = text_format.Parse( - """ - feature { - name: "int_feature" - type: INT - value_count { - min: 0 - max: 2 - } - } - feature { - name: "float_feature" - type: FLOAT - value_count { - min: 0 - max: 2 - } - } - feature { - name: "string_feature" - type: BYTES - value_count { - min: 0 - max: 2 - } - } - """, schema_pb2.Schema()) - -_UNORDERED_SCHEMA = text_format.Parse( - """ - feature { - name: "string_feature" - type: BYTES - value_count { - min: 0 - max: 2 - } - } - feature { - name: "int_feature" - type: INT - value_count { - min: 0 - max: 2 - } - } - feature { - name: "float_feature" - type: FLOAT - value_count { - min: 0 - max: 2 - } - } - """, schema_pb2.Schema()) - -_EXPECTED_ARROW_SCHEMA = pa.schema([ - pa.field("int_feature", pa.large_list(pa.int64())), - pa.field("float_feature", pa.large_list(pa.float32())), - pa.field("string_feature", pa.large_list(pa.large_binary())) -]) - -_EXPECTED_PROJECTED_ARROW_SCHEMA = pa.schema([ - pa.field("int_feature", pa.large_list(pa.int64())), -]) - -_EXPECTED_COLUMN_VALUES = { - "int_feature": - pa.array([[1], [2]], type=pa.large_list(pa.int64())), - "float_feature": - pa.array([[2.0], [3.0]], type=pa.large_list(pa.float32())), - "string_feature": - pa.array([[b"abc"], [b"xyz"]], type=pa.large_list(pa.large_binary())), -} - - -def _WriteInputs(filename): - df = pd.DataFrame(_ROWS) - table = pa.Table.from_pandas(df, schema=_EXPECTED_ARROW_SCHEMA) - pq.write_table(table, filename) - - -class ParquetRecordTest(absltest.TestCase): - - @classmethod - def setUpClass(cls): - super().setUpClass() - cls._example_file = os.path.join(FLAGS.test_tmpdir, "parquettest", - "examples.parquet") - tf.io.gfile.makedirs(os.path.dirname(cls._example_file)) - _WriteInputs(cls._example_file) - - def testImplicitTensorRepresentations(self): - """Tests inferring of tensor representation.""" - tfxio = ParquetTFXIO(file_pattern=self._example_file, - column_names=_COLUMN_NAMES, - schema=_UNORDERED_SCHEMA, - telemetry_descriptors=_TELEMETRY_DESCRIPTORS) - self.assertEqual( - { - "int_feature": - text_format.Parse( - """varlen_sparse_tensor { column_name: "int_feature"}""", - schema_pb2.TensorRepresentation()), - "float_feature": - text_format.Parse( - """varlen_sparse_tensor { column_name: "float_feature"}""", - schema_pb2.TensorRepresentation()), - "string_feature": - text_format.Parse( - """varlen_sparse_tensor { column_name: "string_feature" }""", - schema_pb2.TensorRepresentation()), - }, tfxio.TensorRepresentations()) - - def _AssertFn(record_batch_list): - self.assertLen(record_batch_list, 1) - record_batch = record_batch_list[0] - self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) - self.assertTrue(record_batch.schema.equals(tfxio.ArrowSchema())) - tensor_adapter = tfxio.TensorAdapter() - dict_of_tensors = tensor_adapter.ToBatchTensors(record_batch) - self.assertLen(dict_of_tensors, 3) - self.assertIn("int_feature", dict_of_tensors) - self.assertIn("float_feature", dict_of_tensors) - self.assertIn("string_feature", dict_of_tensors) - - pipeline = beam.Pipeline() - record_batch_pcoll = (pipeline | tfxio.BeamSource(batch_size=_NUM_ROWS)) - beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) - pipeline_result = pipeline.run() - pipeline_result.wait_until_finish() - telemetry_test_util.ValidateMetrics(self, pipeline_result, - _TELEMETRY_DESCRIPTORS, 'parquet', - 'parquet') - - def testExplicitTensorRepresentations(self): - """Tests when the tensor representation is explicitely provided in the schema.""" - schema = schema_pb2.Schema() - schema.CopyFrom(_SCHEMA) - tensor_representations = { - "my_feature": - text_format.Parse( - """ - dense_tensor { - column_name: "string_feature" - shape { dim { size: 1 } } - default_value { bytes_value: "abc" } - }""", schema_pb2.TensorRepresentation()) - } - schema.tensor_representation_group[""].CopyFrom( - schema_pb2.TensorRepresentationGroup( - tensor_representation=tensor_representations)) - - tfxio = ParquetTFXIO(file_pattern=self._example_file, - column_names=_COLUMN_NAMES, - schema=schema, - telemetry_descriptors=_TELEMETRY_DESCRIPTORS) - self.assertEqual(tensor_representations, tfxio.TensorRepresentations()) - - def testProjection(self): - """Test projecting of a TFXIO.""" - tfxio = ParquetTFXIO(file_pattern=self._example_file, - column_names=_COLUMN_NAMES, - schema=_UNORDERED_SCHEMA, - telemetry_descriptors=_TELEMETRY_DESCRIPTORS) - - projected_tfxio = tfxio.Project(["int_feature"]) - - # The projected_tfxio has the projected schema - self.assertTrue( - projected_tfxio.ArrowSchema().equals(_EXPECTED_PROJECTED_ARROW_SCHEMA)) - - def _AssertFn(record_batch_list): - self.assertLen(record_batch_list, 1) - record_batch = record_batch_list[0] - self._ValidateRecordBatch(record_batch, _EXPECTED_PROJECTED_ARROW_SCHEMA) - expected_schema = projected_tfxio.ArrowSchema() - self.assertListEqual( - record_batch.schema.names, expected_schema.names, - "actual: {}; expected: {}".format(record_batch.schema.names, - expected_schema.names)) - self.assertListEqual( - record_batch.schema.types, expected_schema.types, - "actual: {}; expected: {}".format(record_batch.schema.types, - expected_schema.types)) - tensor_adapter = projected_tfxio.TensorAdapter() - dict_of_tensors = tensor_adapter.ToBatchTensors(record_batch) - self.assertLen(dict_of_tensors, 1) - self.assertIn("int_feature", dict_of_tensors) - - pipeline = beam.Pipeline() - record_batch_pcoll = (pipeline | - projected_tfxio.BeamSource(batch_size=_NUM_ROWS)) - beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) - pipeline_result = pipeline.run() - pipeline_result.wait_until_finish() - telemetry_test_util.ValidateMetrics(self, pipeline_result, - _TELEMETRY_DESCRIPTORS, 'parquet', - 'parquet') - - def testOptionalSchema(self): - """Tests when the schema is not provided.""" - tfxio = ParquetTFXIO(file_pattern=self._example_file, - column_names=_COLUMN_NAMES, - telemetry_descriptors=_TELEMETRY_DESCRIPTORS) - - self.assertEqual(tfxio.ArrowSchema(), _EXPECTED_ARROW_SCHEMA) - - def _AssertFn(record_batch_list): - self.assertLen(record_batch_list, 1) - record_batch = record_batch_list[0] - self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) - - - pipeline = beam.Pipeline() - record_batch_pcoll = (pipeline | tfxio.BeamSource(batch_size=_NUM_ROWS)) - beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) - pipeline_result = pipeline.run() - pipeline_result.wait_until_finish() - telemetry_test_util.ValidateMetrics(self, pipeline_result, - _TELEMETRY_DESCRIPTORS, 'parquet', - 'parquet') - - def testUnorderedSchema(self): - """Tests various valid schemas.""" - tfxio = ParquetTFXIO(file_pattern=self._example_file, - column_names=_COLUMN_NAMES, - schema=_UNORDERED_SCHEMA) - - def _AssertFn(record_batch_list): - self.assertLen(record_batch_list, 1) - record_batch = record_batch_list[0] - self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) - - with beam.Pipeline() as p: - record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) - beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) - - def _ValidateRecordBatch(self, record_batch, expected_arrow_schema): - self.assertIsInstance(record_batch, pa.RecordBatch) - self.assertEqual(record_batch.num_rows, 2) - # when reading the parquet files and then transforming them to RecordBatches, - # metadata is populated, specifically the pandas metadata. - # We do not assert that metadata. - self.assertListEqual( - record_batch.schema.names, expected_arrow_schema.names, - "Expected: {} ; got {}".format(expected_arrow_schema.names, - record_batch.schema.names)) - self.assertListEqual( - record_batch.schema.types, expected_arrow_schema.types, - "Expected: {} ; got {}".format(expected_arrow_schema.types, - record_batch.schema.types)) - for i, field in enumerate(record_batch.schema): - self.assertTrue( - record_batch.column(i).equals(_EXPECTED_COLUMN_VALUES[field.name]), - "Column {} did not match ({} vs {}).".format( - field.name, record_batch.column(i), - _EXPECTED_COLUMN_VALUES[field.name])) - - -if __name__ == "__main__": - absltest.main() diff --git a/tfx_bsl/tfxio/telemetry_test.py b/tfx_bsl/tfxio/telemetry_test.py index 6ca4ea43..46f4bbf5 100644 --- a/tfx_bsl/tfxio/telemetry_test.py +++ b/tfx_bsl/tfxio/telemetry_test.py @@ -73,7 +73,10 @@ def _GetMetricName(name): _GetMetricName("num_cells[STRING]"): 3, _GetMetricName("num_cells[INT]"): 4, _GetMetricName("num_cells[FLOAT]"): 4, - }), + }, + telemetry_descriptors=["test", "component"], + expected_namespace="tfx.test.component.io", + ), dict( testcase_name="deeply_nested_list", record_batch=pa.RecordBatch.from_arrays( @@ -147,22 +150,27 @@ def _AssertDistributionEqual( (msg + ": ") if msg else "", expected, beam_distribution_result)) @parameterized.named_parameters(*_PROFILE_RECORD_BATCHES_TEST_CASES) - def testProfileRecordBatches(self, record_batch, expected_distributions, - expected_counters): + def testProfileRecordBatches(self, + record_batch, + expected_distributions, + expected_counters, + telemetry_descriptors=None, + expected_namespace="tfx.UNKNOWN_COMPONENT.io"): p = beam.Pipeline() - _ = (p - # Slice the input into two pieces to make sure profiling can handle - # sliced RecordBatches. - | "CreateTestData" >> beam.Create([record_batch.slice(0, 1), - record_batch.slice(1)]) - | "Profile" >> telemetry.ProfileRecordBatches( - ["test", "component"], _LOGICAL_FORMAT, _PHYSICAL_FORMAT, 1.0)) + _ = ( + p + # Slice the input into two pieces to make sure profiling can handle + # sliced RecordBatches. + | "CreateTestData" >> beam.Create( + [record_batch.slice(0, 1), + record_batch.slice(1)]) + | "Profile" >> telemetry.ProfileRecordBatches( + telemetry_descriptors, _LOGICAL_FORMAT, _PHYSICAL_FORMAT, 1.0)) runner = p.run() runner.wait_until_finish() all_metrics = runner.metrics() maintained_metrics = all_metrics.query( - beam.metrics.metric.MetricsFilter().with_namespace( - "tfx.test.component.io")) + beam.metrics.metric.MetricsFilter().with_namespace(expected_namespace)) counters = maintained_metrics[beam.metrics.metric.MetricResults.COUNTERS] self.assertLen(counters, len(expected_counters)) @@ -179,18 +187,27 @@ def testProfileRecordBatches(self, record_batch, expected_distributions, dist.result, expected_distributions[dist.key.metric.name], dist.key.metric.name) - def testProfileRawRecords(self): + @parameterized.named_parameters( + dict( + testcase_name="no_descriptors", + telemetry_descriptors=None, + expected_namespace="tfx.UNKNOWN_COMPONENT.io"), + dict( + testcase_name="with_descriptors", + telemetry_descriptors=["test", "component"], + expected_namespace="tfx.test.component.io")) + def testProfileRawRecords(self, telemetry_descriptors, expected_namespace): p = beam.Pipeline() - _ = (p - | "CreateTestData" >> beam.Create([b"aaa", b"bbbb"]) - | "Profile" >> telemetry.ProfileRawRecords( - ["test", "component"], _LOGICAL_FORMAT, _PHYSICAL_FORMAT)) + _ = ( + p + | "CreateTestData" >> beam.Create([b"aaa", b"bbbb"]) + | "Profile" >> telemetry.ProfileRawRecords( + telemetry_descriptors, _LOGICAL_FORMAT, _PHYSICAL_FORMAT)) runner = p.run() runner.wait_until_finish() all_metrics = runner.metrics() maintained_metrics = all_metrics.query( - beam.metrics.metric.MetricsFilter().with_namespace( - "tfx.test.component.io")) + beam.metrics.metric.MetricsFilter().with_namespace(expected_namespace)) counters = maintained_metrics[beam.metrics.metric.MetricResults.COUNTERS] self.assertLen(counters, 1) num_records_counter = counters[0]