From 01dd2adb94d78e0f4ebf455776b6a40c669de864 Mon Sep 17 00:00:00 2001 From: Martin Bomio Date: Wed, 23 Mar 2022 17:30:13 -0600 Subject: [PATCH 1/5] Make column names optional for parquet tfxio --- bazel-bin | 1 + bazel-out | 1 + bazel-testlogs | 1 + bazel-tfx-bsl | 1 + tfx_bsl/tfxio/parquet_tfxio.py | 11 ++++++++--- tfx_bsl/tfxio/parquet_tfxio_test.py | 28 ++++++++++++++++++++++++++++ 6 files changed, 40 insertions(+), 3 deletions(-) create mode 120000 bazel-bin create mode 120000 bazel-out create mode 120000 bazel-testlogs create mode 120000 bazel-tfx-bsl diff --git a/bazel-bin b/bazel-bin new file mode 120000 index 00000000..e5a88789 --- /dev/null +++ b/bazel-bin @@ -0,0 +1 @@ +/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out/darwin-opt/bin \ No newline at end of file diff --git a/bazel-out b/bazel-out new file mode 120000 index 00000000..d3536019 --- /dev/null +++ b/bazel-out @@ -0,0 +1 @@ +/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out \ No newline at end of file diff --git a/bazel-testlogs b/bazel-testlogs new file mode 120000 index 00000000..633b49f2 --- /dev/null +++ b/bazel-testlogs @@ -0,0 +1 @@ +/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out/darwin-opt/testlogs \ No newline at end of file diff --git a/bazel-tfx-bsl b/bazel-tfx-bsl new file mode 120000 index 00000000..026f5fe6 --- /dev/null +++ b/bazel-tfx-bsl @@ -0,0 +1 @@ +/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl \ No newline at end of file diff --git a/tfx_bsl/tfxio/parquet_tfxio.py b/tfx_bsl/tfxio/parquet_tfxio.py index 6dee4036..ef2239e8 100644 --- a/tfx_bsl/tfxio/parquet_tfxio.py +++ b/tfx_bsl/tfxio/parquet_tfxio.py @@ -39,8 +39,8 @@ class ParquetTFXIO(tfxio.TFXIO): def __init__(self, file_pattern: str, - column_names: List[str], *, + column_names: Optional[List[str]] = None, min_bundle_size: int = 0, schema: Optional[schema_pb2.Schema] = None, validate: bool = True, @@ -51,7 +51,8 @@ def __init__(self, file_pattern: A file glob pattern to read parquet files from. column_names: List of column names to read from the parquet files. min_bundle_size: the minimum size in bytes, to be considered when - splitting the parquet input into bundles. + splitting the parquet input into bundles. If not provided, all columns + in the dataset will be read. schema: An optional TFMD Schema describing the dataset. If schema is provided, it will determine the data type of the parquet columns. Otherwise, the each column's data type will be inferred by the decoder. @@ -106,7 +107,11 @@ def _TableToRecordBatch( def ArrowSchema(self) -> pa.Schema: if self._schema is None: return self._InferArrowSchema() - return csv_decoder.GetArrowSchema(self._column_names, self._schema) + + # if the column names are not passed, we default to all column names in the schema. + columns = self._column_names or [f.name for f in self._schema.feature] + + return csv_decoder.GetArrowSchema(columns, self._schema) def _InferArrowSchema(self): match_result = FileSystems.match([self._file_pattern])[0] diff --git a/tfx_bsl/tfxio/parquet_tfxio_test.py b/tfx_bsl/tfxio/parquet_tfxio_test.py index cd8bf4f2..d61d1659 100644 --- a/tfx_bsl/tfxio/parquet_tfxio_test.py +++ b/tfx_bsl/tfxio/parquet_tfxio_test.py @@ -301,6 +301,34 @@ def _AssertFn(record_batch_list): record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + def testOptionalColumnNames(self): + """Tests various valid schemas.""" + tfxio = ParquetTFXIO( + file_pattern=self._example_file, + schema=_SCHEMA) + + def _AssertFn(record_batch_list): + self.assertLen(record_batch_list, 1) + record_batch = record_batch_list[0] + self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) + + with beam.Pipeline() as p: + record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) + beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + + def testOptionalColumnNamesAndSchema(self): + """Tests various valid schemas.""" + tfxio = ParquetTFXIO(file_pattern=self._example_file) + + def _AssertFn(record_batch_list): + self.assertLen(record_batch_list, 1) + record_batch = record_batch_list[0] + self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) + + with beam.Pipeline() as p: + record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) + beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + def _ValidateRecordBatch(self, record_batch, expected_arrow_schema): self.assertIsInstance(record_batch, pa.RecordBatch) self.assertEqual(record_batch.num_rows, 2) From 09241b98f572b82112ce5ea9105b7d9a5fe66ad2 Mon Sep 17 00:00:00 2001 From: Martin Bomio Date: Wed, 23 Mar 2022 17:52:48 -0600 Subject: [PATCH 2/5] Add telemetry_descriptors property --- tfx_bsl/tfxio/parquet_tfxio.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tfx_bsl/tfxio/parquet_tfxio.py b/tfx_bsl/tfxio/parquet_tfxio.py index ef2239e8..b2eaba66 100644 --- a/tfx_bsl/tfxio/parquet_tfxio.py +++ b/tfx_bsl/tfxio/parquet_tfxio.py @@ -71,6 +71,10 @@ def __init__(self, self._schema = schema self._telemetry_descriptors = telemetry_descriptors + @property + def telemetry_descriptors(self) -> Optional[List[str]]: + return self._telemetry_descriptors + def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform: @beam.typehints.with_input_types(Union[beam.PCollection, beam.Pipeline]) From 8d937c7b0a81be27627fb2141bc91e737c9fbc74 Mon Sep 17 00:00:00 2001 From: Martin Bomio Date: Thu, 24 Mar 2022 12:42:17 -0600 Subject: [PATCH 3/5] Add test case for subset of columns --- tfx_bsl/tfxio/parquet_tfxio_test.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tfx_bsl/tfxio/parquet_tfxio_test.py b/tfx_bsl/tfxio/parquet_tfxio_test.py index d61d1659..65ce0940 100644 --- a/tfx_bsl/tfxio/parquet_tfxio_test.py +++ b/tfx_bsl/tfxio/parquet_tfxio_test.py @@ -329,6 +329,25 @@ def _AssertFn(record_batch_list): record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + def testSubsetOfColumnNames(self): + """Tests various valid schemas.""" + tfxio = ParquetTFXIO( + file_pattern=self._example_file, + column_names=['int_feature'], + schema=_SCHEMA) + + def _AssertFn(record_batch_list): + self.assertLen(record_batch_list, 1) + record_batch = record_batch_list[0] + expected_arrow_schema = pa.schema([ + pa.field("int_feature", pa.large_list(pa.int64())), + ]) + self._ValidateRecordBatch(record_batch, expected_arrow_schema) + + with beam.Pipeline() as p: + record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) + beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + def _ValidateRecordBatch(self, record_batch, expected_arrow_schema): self.assertIsInstance(record_batch, pa.RecordBatch) self.assertEqual(record_batch.num_rows, 2) From 1edd9eb3336c5ca4ff51723cd08012681199ca32 Mon Sep 17 00:00:00 2001 From: Martin Bomio Date: Thu, 24 Mar 2022 13:27:14 -0600 Subject: [PATCH 4/5] Add test for subset of columns with a projected schema --- tfx_bsl/tfxio/parquet_tfxio_test.py | 33 ++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/tfx_bsl/tfxio/parquet_tfxio_test.py b/tfx_bsl/tfxio/parquet_tfxio_test.py index 65ce0940..3e6799c0 100644 --- a/tfx_bsl/tfxio/parquet_tfxio_test.py +++ b/tfx_bsl/tfxio/parquet_tfxio_test.py @@ -329,7 +329,7 @@ def _AssertFn(record_batch_list): record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) - def testSubsetOfColumnNames(self): + def testSubsetOfColumnNamesWithCompleteSchema(self): """Tests various valid schemas.""" tfxio = ParquetTFXIO( file_pattern=self._example_file, @@ -348,6 +348,37 @@ def _AssertFn(record_batch_list): record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + def testSubsetOfColumnNamesWithSubsetSchema(self): + """Tests various valid schemas.""" + schema = text_format.Parse( + """ + feature { + name: "int_feature" + type: INT + value_count { + min: 0 + max: 2 + } + } + """, schema_pb2.Schema()) + + tfxio = ParquetTFXIO( + file_pattern=self._example_file, + column_names=['int_feature'], + schema=schema) + + def _AssertFn(record_batch_list): + self.assertLen(record_batch_list, 1) + record_batch = record_batch_list[0] + expected_arrow_schema = pa.schema([ + pa.field("int_feature", pa.large_list(pa.int64())), + ]) + self._ValidateRecordBatch(record_batch, expected_arrow_schema) + + with beam.Pipeline() as p: + record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) + beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) + def _ValidateRecordBatch(self, record_batch, expected_arrow_schema): self.assertIsInstance(record_batch, pa.RecordBatch) self.assertEqual(record_batch.num_rows, 2) From a65d12b8d76ff70d578475867ebccd03f59e02e7 Mon Sep 17 00:00:00 2001 From: Martin Bomio Date: Fri, 25 Mar 2022 13:23:17 -0600 Subject: [PATCH 5/5] Remove bazel files --- bazel-bin | 1 - bazel-out | 1 - bazel-testlogs | 1 - bazel-tfx-bsl | 1 - 4 files changed, 4 deletions(-) delete mode 120000 bazel-bin delete mode 120000 bazel-out delete mode 120000 bazel-testlogs delete mode 120000 bazel-tfx-bsl diff --git a/bazel-bin b/bazel-bin deleted file mode 120000 index e5a88789..00000000 --- a/bazel-bin +++ /dev/null @@ -1 +0,0 @@ -/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out/darwin-opt/bin \ No newline at end of file diff --git a/bazel-out b/bazel-out deleted file mode 120000 index d3536019..00000000 --- a/bazel-out +++ /dev/null @@ -1 +0,0 @@ -/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out \ No newline at end of file diff --git a/bazel-testlogs b/bazel-testlogs deleted file mode 120000 index 633b49f2..00000000 --- a/bazel-testlogs +++ /dev/null @@ -1 +0,0 @@ -/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out/darwin-opt/testlogs \ No newline at end of file diff --git a/bazel-tfx-bsl b/bazel-tfx-bsl deleted file mode 120000 index 026f5fe6..00000000 --- a/bazel-tfx-bsl +++ /dev/null @@ -1 +0,0 @@ -/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl \ No newline at end of file