-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make column names optional for parquet tfxio #54
Changes from 4 commits
01dd2ad
09241b9
8d937c7
1edd9eb
a65d12b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out/darwin-opt/bin | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl/bazel-out/darwin-opt/testlogs |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/private/var/tmp/_bazel_martinbomio/17e5cf616981f27d03c506e3e9f0879d/execroot/tfx_bsl |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -301,6 +301,84 @@ def _AssertFn(record_batch_list): | |
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) | ||
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) | ||
|
||
def testOptionalColumnNames(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, could you please also add a test when only a subset of columns is read There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @iindyk happy to add one, but isn't the projected test case covering this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it does, but through project path, which shouldn't be the default when one just wants to read subset of data (e.g. it's should not be necessary to have the whole schema). Users often take examples from tests, so seeing how it can be done here would be useful imo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. makes sense, I'll add it |
||
"""Tests various valid schemas.""" | ||
tfxio = ParquetTFXIO( | ||
file_pattern=self._example_file, | ||
schema=_SCHEMA) | ||
|
||
def _AssertFn(record_batch_list): | ||
self.assertLen(record_batch_list, 1) | ||
record_batch = record_batch_list[0] | ||
self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) | ||
|
||
with beam.Pipeline() as p: | ||
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) | ||
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) | ||
|
||
def testOptionalColumnNamesAndSchema(self): | ||
"""Tests various valid schemas.""" | ||
tfxio = ParquetTFXIO(file_pattern=self._example_file) | ||
|
||
def _AssertFn(record_batch_list): | ||
self.assertLen(record_batch_list, 1) | ||
record_batch = record_batch_list[0] | ||
self._ValidateRecordBatch(record_batch, _EXPECTED_ARROW_SCHEMA) | ||
|
||
with beam.Pipeline() as p: | ||
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) | ||
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) | ||
|
||
def testSubsetOfColumnNamesWithCompleteSchema(self): | ||
"""Tests various valid schemas.""" | ||
tfxio = ParquetTFXIO( | ||
file_pattern=self._example_file, | ||
column_names=['int_feature'], | ||
schema=_SCHEMA) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this just contain the int_feature? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, I was trying to test that even when the schema has all fields, but the requested columns are a subset, the result schema and result records will have just a subset There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added another test case for this |
||
|
||
def _AssertFn(record_batch_list): | ||
self.assertLen(record_batch_list, 1) | ||
record_batch = record_batch_list[0] | ||
expected_arrow_schema = pa.schema([ | ||
pa.field("int_feature", pa.large_list(pa.int64())), | ||
]) | ||
self._ValidateRecordBatch(record_batch, expected_arrow_schema) | ||
|
||
with beam.Pipeline() as p: | ||
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) | ||
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) | ||
|
||
def testSubsetOfColumnNamesWithSubsetSchema(self): | ||
"""Tests various valid schemas.""" | ||
schema = text_format.Parse( | ||
""" | ||
feature { | ||
name: "int_feature" | ||
type: INT | ||
value_count { | ||
min: 0 | ||
max: 2 | ||
} | ||
} | ||
""", schema_pb2.Schema()) | ||
|
||
tfxio = ParquetTFXIO( | ||
file_pattern=self._example_file, | ||
column_names=['int_feature'], | ||
schema=schema) | ||
|
||
def _AssertFn(record_batch_list): | ||
self.assertLen(record_batch_list, 1) | ||
record_batch = record_batch_list[0] | ||
expected_arrow_schema = pa.schema([ | ||
pa.field("int_feature", pa.large_list(pa.int64())), | ||
]) | ||
self._ValidateRecordBatch(record_batch, expected_arrow_schema) | ||
|
||
with beam.Pipeline() as p: | ||
record_batch_pcoll = (p | tfxio.BeamSource(batch_size=_NUM_ROWS)) | ||
beam_testing_util.assert_that(record_batch_pcoll, _AssertFn) | ||
|
||
def _ValidateRecordBatch(self, record_batch, expected_arrow_schema): | ||
self.assertIsInstance(record_batch, pa.RecordBatch) | ||
self.assertEqual(record_batch.num_rows, 2) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please revert files outside of the project
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, removed them in a65d12b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imported internally, under review