-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove CoGBK in MLTransform's TFTProcessHandler #30146
Conversation
ff0ae49
to
8e4dda7
Compare
8e4dda7
to
ee770c4
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
""" | ||
def process(self, element): | ||
element.update(pickler.loads(element[_TEMP_KEY].item())) | ||
del element[_TEMP_KEY] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not mutate elements. emit a copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if key in data_to_encode: | ||
del data_to_encode[key] | ||
|
||
bytes = pickler.dumps(data_to_encode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be more efficient to use type-aware cythonized coder. Performance difference can be tested in a microbenchmark.
We might be able to convert elements to Beam Row, and use RowCoder. Seeing something similar in:
beam/sdks/python/apache_beam/transforms/external.py
Lines 127 to 132 in a221f98
schema_proto = named_fields_to_schema(named_fields) | |
row = named_tuple_from_schema(schema_proto)(**fields_to_values) | |
schema = named_tuple_to_schema(type(row)) | |
payload = RowCoder(schema).encode(row) | |
return (schema_proto, payload) |
Can we expect the schema of the elements in the dict the same ? do we have type information from the elements or would need to infer them?
cc: @robertwb who might have a better idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the unused elements, we won't know what the schema of the elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have to use pickling to serialize elements, I would use native pickler rather than Beam's, perhaps by way of PickleCoder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the unused elements, we won't know what the schema of the elements.
According to MLTransform docs, elements end up being Rows:
To define a data processing transformation by using MLTransform, create instances of data processing transforms with columns as input parameters. The data in the specified columns is transformed and outputted to the beam.Row object.
do we infer the types later then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the data for the. columns that are provided for transformation. For these columns using TFT, we infer the schema
I will make few changes and will request a review soon. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we run existing tests on this change?
@@ -447,22 +409,22 @@ def expand( | |||
raw_data_metadata = metadata_io.read_metadata( | |||
os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR)) | |||
|
|||
keyed_raw_data = (raw_data | beam.ParDo(_ComputeAndAttachUniqueID())) | |||
keyed_raw_data = (raw_data) # | beam.ParDo(_ComputeAndAttachUniqueID())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover comment, also we no longer add keys , so keyed_
might not be the best name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the keyed_ from variable names
|
||
# To maintain consistency by outputting numpy array all the time, | ||
# whether a scalar value or list or np array is passed as input, | ||
# we will convert scalar values to list values and TFT will ouput | ||
# numpy array all the time. | ||
raw_data_list = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my understanding, why is this called raw_data_list? it's modified, so not raw i think, and what's here about _list
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is modified. I removed raw from the variable name.
_list: we convert the scalar element to list (len:1) to maintain uniformity. Users can pass list/np arrays to TFT ops and TFT outputs numpy arrays. Users when pass scalars, TFT outputs scalars. to maintain consistent output format, we convert scalar to list.
self.exclude_columns = exclude_columns | ||
|
||
def encode(self, element): | ||
if not self.exclude_columns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting. Is it possible for exclude_columns
be emtpy? I'd imagine it could rather be the opposite, where all columns are being processed, so there is nothing to encode/decode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that is right but it errors because we are adding the temp id column name to the schema during construction so TFT errors out if the pcoll doesn't have the temp id column. So when the unused columns are none, we have to encode the empty dict and pass it to the PColl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we run existing tests on this change?
yes, they should be covered in the py38coverage. |
|
||
def decode(self, element): | ||
clone = copy.copy(element) | ||
clone.update(self.coder.decode(clone[_TEMP_KEY].item())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the function of .item()
here? what is the type of clone[_TEMP_KEY]? are the elements in given that we call .item() here - will elements in clone
have consistent type after decoding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type of clone[_TEMP_KEY] is a numpy array and .item() returns underlying element of the numpy array.
will elements in clone have consistent type after decoding.
It should be. depending on the Coder.
Co-authored-by: tvalentyn <[email protected]>
This PR attempts to remove CoGBK from MLTransform's TFTProcessHandler.
Instead of using CoGBK, encode the dict of columns not specified by the user into bytes and pass them in the original dict with a temporary key specified in the schema for TFT. In the TFT
AnalyzeDataset
orTransformDataset
, this temp column would be a no-op. Once TFT is done processing data, decode the bytes into their actual format.This only affects the data processing operations specified at
apache_beam/ml/transforms/tft.py
. The operations support float and string data. We have tests for them already.For encoding and decoding purposes, I used
apache_beam/pickler
for now.Fixes: #29593
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.