Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make column names optional for parquet tfxio #54

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bazel-bin
1 change: 1 addition & 0 deletions bazel-out
1 change: 1 addition & 0 deletions bazel-testlogs
1 change: 1 addition & 0 deletions bazel-tfx-bsl
15 changes: 12 additions & 3 deletions tfx_bsl/tfxio/parquet_tfxio.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class ParquetTFXIO(tfxio.TFXIO):

def __init__(self,
file_pattern: str,
column_names: List[str],
*,
column_names: Optional[List[str]] = None,
min_bundle_size: int = 0,
schema: Optional[schema_pb2.Schema] = None,
validate: bool = True,
Expand All @@ -51,7 +51,8 @@ def __init__(self,
file_pattern: A file glob pattern to read parquet files from.
column_names: List of column names to read from the parquet files.
min_bundle_size: the minimum size in bytes, to be considered when
splitting the parquet input into bundles.
splitting the parquet input into bundles. If not provided, all columns
in the dataset will be read.
schema: An optional TFMD Schema describing the dataset. If schema is
provided, it will determine the data type of the parquet columns.
Otherwise, the each column's data type will be inferred by the decoder.
Expand All @@ -70,6 +71,10 @@ def __init__(self,
self._schema = schema
self._telemetry_descriptors = telemetry_descriptors

@property
def telemetry_descriptors(self) -> Optional[List[str]]:
return self._telemetry_descriptors

def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:

@beam.typehints.with_input_types(Union[beam.PCollection, beam.Pipeline])
Expand Down Expand Up @@ -106,7 +111,11 @@ def _TableToRecordBatch(
def ArrowSchema(self) -> pa.Schema:
if self._schema is None:
return self._InferArrowSchema()
return csv_decoder.GetArrowSchema(self._column_names, self._schema)

# if the column names are not passed, we default to all column names in the schema.
columns = self._column_names or [f.name for f in self._schema.feature]

return csv_decoder.GetArrowSchema(columns, self._schema)

def _InferArrowSchema(self):
match_result = FileSystems.match([self._file_pattern])[0]
Expand Down
28 changes: 28 additions & 0 deletions tfx_bsl/tfxio/parquet_tfxio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,34 @@ def _AssertFn(record_batch_list):
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS))
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn)

def testOptionalColumnNames(self):
Copy link
Collaborator

@iindyk iindyk Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, could you please also add a test when only a subset of columns is read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iindyk happy to add one, but isn't the projected test case covering this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, but through project path, which shouldn't be the default when one just wants to read subset of data (e.g. it's should not be necessary to have the whole schema). Users often take examples from tests, so seeing how it can be done here would be useful imo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I'll add it

"""Tests various valid schemas."""
tfxio = ParquetTFXIO(
file_pattern=self._example_file,
schema=_SCHEMA)

def _AssertFn(record_batch_list):
self.assertLen(record_batch_list, 1)
record_batch = record_batch_list[0]
self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA)

with beam.Pipeline() as p:
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS))
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn)

def testOptionalColumnNamesAndSchema(self):
"""Tests various valid schemas."""
tfxio = ParquetTFXIO(file_pattern=self._example_file)

def _AssertFn(record_batch_list):
self.assertLen(record_batch_list, 1)
record_batch = record_batch_list[0]
self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA)

with beam.Pipeline() as p:
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS))
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn)

def _ValidateRecordBatch(self, record_batch, expected_arrow_schema):
self.assertIsInstance(record_batch, pa.RecordBatch)
self.assertEqual(record_batch.num_rows, 2)
Expand Down