-
Notifications
You must be signed in to change notification settings - Fork 715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend tfxio factory to use parquet-tfxio #4761
Extend tfxio factory to use parquet-tfxio #4761
Conversation
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). For more information, open the CLA check for this pull request. |
c222433
to
470fdb3
Compare
@iindyk PTAL |
lgtm, I'll loop in ppl with approval access |
tfx/components/util/tfxio_utils.py
Outdated
from tfx_bsl.tfxio import raw_tf_record | ||
from tfx_bsl.tfxio import record_to_tensor_tfxio | ||
from tfx_bsl.tfxio import tf_example_record | ||
from tfx_bsl.tfxio import tf_sequence_example_record | ||
from tfx_bsl.tfxio import tfxio | ||
from tensorflow_metadata.proto.v0 import schema_pb2 | ||
|
||
|
||
_SUPPORTED_PAYLOAD_FORMATS = ['parquet', 'tfrecords_gzip'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: set or tuple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider to use this
FORMAT_TFRECORDS_GZIP is currently supported, and FORMAT_PARQUET will be added.
ExampleGen will attach the file format in its output example artifact, which can be used by downstream to figure out the example artifact's file format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah nice, I missed that FileFormat proto message!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1025KB wondering if you also wanted me to change the type of file_format
param in make_tfxio
to example_gen_pb2.FileFormat
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I didn't notice it's Payload format, then using this payload format is fine too
what I mean here is instead of hardcoded 'parquet', 'tfrecords_gzip' str, can we use enum in PayloadFormat proto here? otherwise I feel there will be a PayloadFormat.FORMAT_PARQUET to 'parquet' conversion somewhere?
iindyk@, any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 465ddff. The variable was actually badly named, it is indeed the supported file formats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I think it makes sense to have enum for such thing instead of string, but if we want to update this we'd also need to update all callsites: components and tests, which is a bit trickier. If this proves to be messy (I think it may be) I'd keep it as is for now and add a todo to refactor this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, then a todo sound good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iindyk should i revert the last commit and add a TODO then? I did check the calls to make_tfxio within this project and non of them were passing the file format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should i revert the last commit and add a TODO then?
sgtm
I did check the calls to make_tfxio within this project and non of them were passing the file format
Even tests? that's unfortunate this path is not covered. Aside from the upstream code we'd also need to update tfxios that are produced by the factory, bc they take strings (or make a bridge proto enum ->string).
For payload_format there are places that actually pass int (and I'm not yet sure if there's a good reason for that, this seems fragile).
tfx/proto/example_gen.proto
Outdated
@@ -111,7 +111,10 @@ enum PayloadFormat { | |||
// Serialized any protocol buffer. | |||
FORMAT_PROTO = 11; | |||
|
|||
reserved 1 to 5, 8 to 10, 12 to max; | |||
// Serialized parquet messages. | |||
FORMAT_PARQUET = 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File format is also needed?
e.g., FORMAT_TF_EXAMPLE (payload format) is stored in FORMAT_TFRECORDS_GZIP (file format)
for Parquet, what's the file format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess PARQUET is also the file format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1025KB these are the file formats supported by pyarrow https://arrow.apache.org/docs/python/parquet.html#compression-encoding-and-file-compatibility. Should I add one for each? or should we agree on a set of ones we are planning to support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default is SNAPPY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary, one splittable file format (parquet) in addition to tfrecord (non-splittable file format) should be enough for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we will use beam.io.writeToParquet to generate output files, so for reading, only need to support the format used by beam.io.writeToParquet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we're in this code, to maximize compatibility I think it would make sense to include gzip, brotli, and none, as well as snappy - unless that creates headaches, in which case snappy and none (and maybe gzip?) would seem sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think supporting only the format that beam source and sink support should be sufficient for now, other formats could be considered in a follow up if needed
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days |
@iindyk any update on the release? |
The release is currently blocked on a tf-related breakage, but will likely happen this week |
@iindyk any update on the release? |
I think all the release blockers are resolved at this point, so our release engineers are working on it, should be coming out soon |
hey @iindyk should we think about wrapping this one up? Are there any outstanding comments in the PR that we need to address to be able to merge it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left couple comments, but generally LGTM. I don't have owner permission here, so @1025KB PTAL
tfx/components/util/tfxio_utils.py
Outdated
from tfx_bsl.tfxio import raw_tf_record | ||
from tfx_bsl.tfxio import record_to_tensor_tfxio | ||
from tfx_bsl.tfxio import tf_example_record | ||
from tfx_bsl.tfxio import tf_sequence_example_record | ||
from tfx_bsl.tfxio import tfxio | ||
from tensorflow_metadata.proto.v0 import schema_pb2 | ||
|
||
|
||
_SUPPORTED_FILE_FORMATS = {example_gen_pb2.FileFormat.FORMAT_PARQUET, example_gen_pb2.FileFormat.FORMAT_TFRECORDS_GZIP} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use immutable tuple for constants
tfx/proto/example_gen.proto
Outdated
@@ -111,7 +111,10 @@ enum PayloadFormat { | |||
// Serialized any protocol buffer. | |||
FORMAT_PROTO = 11; | |||
|
|||
reserved 1 to 5, 8 to 10, 12 to max; | |||
// Serialized parquet messages. | |||
FORMAT_PARQUET = 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think supporting only the format that beam source and sink support should be sufficient for now, other formats could be considered in a follow up if needed
tfx/components/util/tfxio_utils.py
Outdated
@@ -291,10 +294,10 @@ def make_tfxio( | |||
f'The length of file_pattern and file_formats should be the same.' | |||
f'Given: file_pattern={file_pattern}, file_format={file_format}') | |||
else: | |||
if any(item != 'tfrecords_gzip' for item in file_format): | |||
if any(item in _SUPPORTED_FILE_FORMATS for item in file_format): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in?
tfx/components/util/tfxio_utils.py
Outdated
raise NotImplementedError(f'{file_format} is not supported yet.') | ||
else: # file_format is str type. | ||
if file_format != 'tfrecords_gzip': | ||
if file_format in _SUPPORTED_FILE_FORMATS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Hey @1025KB woyld you mind taking a look. The PR has been open for a very long time and I would love for us to wrap up this work |
tfx/proto/example_gen.proto
Outdated
// Serialized parquet messages. | ||
FORMAT_PARQUET = 12; | ||
|
||
reserved 1 to 5, 8 to 10, 13 to max; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use 15
reserved 1 to 5, 8 to 10, 12 to 14, 16 to max;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the change, but wondering why we don't continuous numbering?
tfx/proto/example_gen.proto
Outdated
// https://arrow.apache.org/docs/python/parquet.html#compression-encoding-and-file-compatibility | ||
FORMAT_PARQUET = 6; | ||
|
||
reserved 1 to 4, 7 to max; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use 16
reserved 1 to 4, 7 to 15, 17 to max;
btw, is parquet's file format and payload format both named "Parquet"? I wonder if there are better names to distinguish them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the changes suggested by @1025KB probably make sense, but otherwise LGTM, and thanks!
@iindyk @rcrowe-google @1025KB addressed the latest suggestions. Should be ready to merge. I need one of you to do the merging :) |
I don't have access to merge here |
@1025KB build seems to be failing with what I think is an unrelated error. Would you mind taking a look? |
Hi, Venkat, could you help on merging this PR? |
@martinbomio The build failure seems like related to 517441d. Could you rebase the PR and retry? |
814c46b
to
30bec96
Compare
@jiyongjung0 done! |
@martinbomio It seems like the current failure is a real problem. Could you take a look? |
tfx/proto/example_gen.proto
Outdated
reserved 1 to 4, 6 to max; | ||
// Indicates parquet format files in any of the supported compressions. | ||
// https://arrow.apache.org/docs/python/parquet.html#compression-encoding-and-file-compatibility | ||
FORMAT_PARQUET = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiyongjung0 fixed the issue, can we trigger another build? |
@@ -12,6 +12,7 @@ | |||
# See the License for the specific language governing permissions and | |||
# limitations under the License. | |||
"""TFXIO (standardized TFX inputs) related utilities.""" | |||
from __future__ import annotations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had to add this to allow for proto enums to be used as types in signatures. Not sure if this is something desirable from the tfx side or not @jiyongjung0 @1025KB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can go back to string in the make_tfxio
function signature instead of the proto enum type.
@@ -306,7 +307,7 @@ def test_get_tf_dataset_factory_from_artifact(self): | |||
dataset_factory = tfxio_utils.get_tf_dataset_factory_from_artifact( | |||
[examples], _TELEMETRY_DESCRIPTORS) | |||
self.assertIsInstance(dataset_factory, Callable) | |||
self.assertEqual(tf.data.Dataset, | |||
self.assertEqual('tf.data.Dataset', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this stringify of the signature is a result of adding: from __future__ import annotations
@1025KB @jiyongjung0 seems like the build is failing with |
I created an internal pending change and I will make sure it submit (your PR will automatically close once I submit it) |
Hi @rcrowe-google @iindyk @1025KB any update on this PR |
the reviewer is just back from vacation, change should be in this week |
PiperOrigin-RevId: 452081871
This PR depends on tensorflow/tfx-bsl#53 and tensorflow/tfx-bsl#54 being released.
Extend the
make_tfxio
factory to useParquetTFXIO
if thepayload_format
isFORMAT_PARQUET
.