-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MLTransform #26795
MLTransform #26795
Conversation
Codecov Report
@@ Coverage Diff @@
## master #26795 +/- ##
==========================================
- Coverage 71.50% 71.05% -0.45%
==========================================
Files 858 858
Lines 104809 104076 -733
==========================================
- Hits 74944 73955 -989
- Misses 28317 28573 +256
Partials 1548 1548
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 24 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't do a thorough review, but the approach here looks pretty good to me
fields = row_type._inner_types() | ||
return Dict[str, Union[tuple(fields)]] | ||
|
||
def _get_artifact_location(self, pipeline: beam.Pipeline): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of dumping this in the staging directory or a temp directory, should we require users to provide an output directory? Presumably, users will want a well defined location for retrieving their artifacts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it as optional. If user doesn't provide it, I am falling back to this approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the staging/temp directory makes sense as a default--if there are artifacts to be produced/consumed, this should be a required argument.
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Assigning reviewers. If you would like to opt out of this review, comment R: @jrmccluskey for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @tvalentyn |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
pass types Support Pyarrow schema Artifact WIP
WIP on inferring types Remove pyarrow implementation Add MLTransformOutput Refactor files
Fix artifacts code Add more tests fix lint erors Change namespaces from ml_transform to transforms Add doc strings Add tests and refactor
Sort imports Add metrics namespaces Refactor
Make VarLenFeatureSpec as default Refactoring
Some more refactoring
…d address PR comments Add skip conditions for tests Add test suite for tft tests
Try except in __init__.py Remove imports from __init__ Add docstrings, refactor
Mock tensorflow_transform in pydocs fix tft pypi name Skip a test Add step name Update supported versions of TFT
@@ -0,0 +1,340 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better name for this file would transforms/tft.py
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename (here or in a follow on PR)?
|
||
import apache_beam as beam | ||
|
||
# TODO: Abstract methods are not getting pickled with dill. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this TODO still apply? What are the consequences?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not relevant anymore. I tried today and I wasn't able to reproduce it now
@@ -326,6 +326,12 @@ commands = | |||
# Run all DataFrame API unit tests | |||
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe' | |||
|
|||
[testenv:py{38,39}-tft-113] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to limit this to 3.8/9 (and not 3.10/11)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And should we trigger any non-3.8 versions in a precommit? Maybe 3.11 to get lowest/highest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tfx_bsl only available in 3.8 and 3.9 python - https://pypi.org/project/tfx-bsl/#files.
I have asked for update on other python versions,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot this was known.
# we will convert scalar values to list values and TFT will ouput | ||
# numpy array all the time. | ||
if not self.is_input_record_batches: | ||
raw_data |= beam.ParDo(ConvertScalarValuesToListValues()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having the is_input_record_batches
param here, could we:
- do a type check in
ConvertScalarValuesToListValues
and no-op if its a record batch - introspect
schema
to determine if it needs theTensorAdapter
Not totally sure about the second piece, but if we can do something like that it would be very helpful. I don't like the idea of having is_input_record_batches
as a top level config, especially once we extend this to other frameworks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main advantage is that if we can kill this, it defers the problem of having framework specific inputs. With that said, I think we need an idea of how we'll solve that regardless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I guess we'll need to solve this now for the output_record_batches param anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to know before pipeline runtime whether it is a record batch or dict to determine the right schema.
So during pipeline construction, I use this flag to construct respective schema. I don't think this can be inferable later
is_input_record_batches: bool = False, | ||
output_record_batches: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need something like this in MLTransform
itself, I'd significantly prefer a public TFTProcessHandler
to take in that config. These options will be meaningless for most other frameworks we might support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option would be to make these properties of the operations instead of the top level transform. Chained TFT operations could then just use the values from the first/last operation.
I also expect that we're going to run into this problem with other frameworks in the future, so I think we need a way for adding additional framework or transform specific parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be my preferred experience. So users could do something like:
MLTransform(
artifact_location=args.artifact_location,
artifact_mode=ArtifactMode.PRODUCE,
).with_transform(ComputeAndApplyVocabulary(
columns=['x'], is_input_record_batches=True)).with_transform(TFIDF(columns=['x'], output_input_record_batches=True))
For TFT, we'd then fuse together any consecutive TFT transforms into a single TFTProcessHandler, resolve any conflicting arguments (e.g. throw if one transform says output_record_batches and the next doesn't take recordBatches or something), and construct the graph
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info about record_batch lives within the context of TFTProcessHandler. I don't like the idea of passing this arg via operations since in the operations, we don't use this arg anywhere in the operation. It would be just a different way of inferring this arg in TFTProcessHandler.
Alternative would be MLTransform would take a config: Dict[str, Any]
, in which we will instruct users to pass certain kind of args such as {'out_record_batch' : True}
, we could pass this config
arg to the respective process handler and discard irrelevant args to that ProcessHandler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't use this arg anywhere in the operation
I think from a user's point of view we use it in the first one; is_input_record_batches=True
just means "I'm expecting that its ok to pass record batches into the first transform, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, the columns are specified at operation level instead of transform level. The entry point for column x could be at the beginning but the entry point at column y could be in the middle of the list. User might pass input_record_batch=True
to the y column if they doesn't understand exactly what we were instructing.
If we ask the user to provide like this, I feel like it could get a little complicated
transforms = [
op1(columns=['x']),
op2(columns=['y', input_record_batch=True],
op3(columns=['z']
]
We can also iterate on this in the v2 since I guess this needs another discussion and remove the option for Record batches for now. We would support Dict[str, Any] in V1. what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm comfortable iterating on it in v2
if not os.path.exists(os.path.join( | ||
self.artifact_location, RAW_DATA_METADATA_DIR, SCHEMA_FILE)): | ||
raise FileNotFoundError( | ||
"Raw data metadata not found at %s" % |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be more descriptive in these errors (something along the lines of "you're running in consume mode" what that means, and "have you run this in produce mode?")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed wording
should be wrapped inside a TFTProcessHandler object before being passed | ||
to the beam.ml.MLTransform class. The TFTProcessHandler will let MLTransform | ||
know which type of input is expected and infers the relevant schema required | ||
for the TFT library. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
@damccorm removed pyarrow record batch. Addressed other comments as well. PTAL ref: #26640 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this LGTM for our initial pass and we can follow up on everything else in future PRs. Known work that remains that I'm aware of:
- Figure out how to support record batch types (and more generally, ProcessHandler specific parameters)
- Follow up with tfx_bsl team to get 3.10+ support
- (minor) rename tft_transforms.py to tft.py
- Support remaining operations
- Future work from https://docs.google.com/document/d/1rQkSm_8tseLqDQaLohtlCGqt5pvMaP0XIpPi5UD0LCQ/edit#heading=h.2jxsxqh
Lets sure we are tracking all of those somewhere (could be an issue or a doc, please just share whatever you choose)
@@ -0,0 +1,340 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename (here or in a follow on PR)?
@@ -326,6 +326,12 @@ commands = | |||
# Run all DataFrame API unit tests | |||
bash {toxinidir}/scripts/run_pytest.sh {envname} 'apache_beam/dataframe' | |||
|
|||
[testenv:py{38,39}-tft-113] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot this was known.
R: @rszper |
Actually, I am thinking I will iterate on the docs in the following PRs. Docs will include pydocs edits and doc pages as well for users |
* Initial work on MLTransform and ProcessHandler * Support for containers: List, Dict[str, np.ndarray] pass types Support Pyarrow schema Artifact WIP * Add min, max, artifacts for scale_0_to_1 * Add more transform functions and artifacts WIP on inferring types Remove pyarrow implementation Add MLTransformOutput Refactor files * Add generic type annotations * Add unit tests Fix artifacts code Add more tests fix lint erors Change namespaces from ml_transform to transforms Add doc strings Add tests and refactor * Add support for saving intermediate results for a transform Sort imports Add metrics namespaces Refactor * Add schema to the output PCollection * Remove MLTransformOutput and return Row instead with schema * Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec Make VarLenFeatureSpec as default Refactoring * Add append_transform to the ProcessHandler Some more refactoring * Remove param self.has_artifacts, add artifact_location to handler..and address PR comments Add skip conditions for tests Add test suite for tft tests * Move tensorflow import into the try except catch Try except in __init__.py Remove imports from __init__ Add docstrings, refactor * Add type annotations for the data transforms * Add tft test in tox.ini Mock tensorflow_transform in pydocs fix tft pypi name Skip a test Add step name Update supported versions of TFT * Add step name for TFTProcessHandler * Remove unsupported tft versions * Fix mypy * Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema * Update doc for data processing transforms * Fix checking the typing container types * Refactor code * Fail TFTProcessHandler on a non-global window PColl * Remove underscore * Remove high level functions * Add TFIDF * Fix tests with new changes[WIP] * Fix tests * Refactor class name to CamelCase and remove kwrags * use is_default instead of isinstance * Remove falling back to staging location for artifact location * Add TFIDF tests * Remove __str__ * Refactor skip statement * Add utils for fetching artifacts on compute and apply vocab * Make ProcessHandler internal class * Only run analyze stage when transform_fn(artifacts) is not computed before. * Fail if pipeline has non default window during artifact producing stage * Add support for Dict, recordbatch and introduce artifact_mode * Hide process_handler from user. Make TFTProcessHandler as default * Refactor few tests * Comment a test * Save raw_data_meta_data so that it can be used during consume stage * Refactor code * Add test on artifacts * Fix imports * Add tensorflow_metadata to pydocs * Fix test * Add TFIDF to import * Add basic example * Remove redundant logging statements * Add test for multiple columns on MLTransform * Add todo about what to do when new process handler is introduced * Add abstractmethod decorator * Edit Error message * Update docs, error messages * Remove record batch input/output arg * Modify generic types * Fix import sort * Fix mypy errors - best effort * Fix tests * Add TFTOperation doc * Rename tft_transform to tft * Fix hadler_test * Fix base_test * Fix pydocs
* Initial work on MLTransform and ProcessHandler * Support for containers: List, Dict[str, np.ndarray] pass types Support Pyarrow schema Artifact WIP * Add min, max, artifacts for scale_0_to_1 * Add more transform functions and artifacts WIP on inferring types Remove pyarrow implementation Add MLTransformOutput Refactor files * Add generic type annotations * Add unit tests Fix artifacts code Add more tests fix lint erors Change namespaces from ml_transform to transforms Add doc strings Add tests and refactor * Add support for saving intermediate results for a transform Sort imports Add metrics namespaces Refactor * Add schema to the output PCollection * Remove MLTransformOutput and return Row instead with schema * Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec Make VarLenFeatureSpec as default Refactoring * Add append_transform to the ProcessHandler Some more refactoring * Remove param self.has_artifacts, add artifact_location to handler..and address PR comments Add skip conditions for tests Add test suite for tft tests * Move tensorflow import into the try except catch Try except in __init__.py Remove imports from __init__ Add docstrings, refactor * Add type annotations for the data transforms * Add tft test in tox.ini Mock tensorflow_transform in pydocs fix tft pypi name Skip a test Add step name Update supported versions of TFT * Add step name for TFTProcessHandler * Remove unsupported tft versions * Fix mypy * Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema * Update doc for data processing transforms * Fix checking the typing container types * Refactor code * Fail TFTProcessHandler on a non-global window PColl * Remove underscore * Remove high level functions * Add TFIDF * Fix tests with new changes[WIP] * Fix tests * Refactor class name to CamelCase and remove kwrags * use is_default instead of isinstance * Remove falling back to staging location for artifact location * Add TFIDF tests * Remove __str__ * Refactor skip statement * Add utils for fetching artifacts on compute and apply vocab * Make ProcessHandler internal class * Only run analyze stage when transform_fn(artifacts) is not computed before. * Fail if pipeline has non default window during artifact producing stage * Add support for Dict, recordbatch and introduce artifact_mode * Hide process_handler from user. Make TFTProcessHandler as default * Refactor few tests * Comment a test * Save raw_data_meta_data so that it can be used during consume stage * Refactor code * Add test on artifacts * Fix imports * Add tensorflow_metadata to pydocs * Fix test * Add TFIDF to import * Add basic example * Remove redundant logging statements * Add test for multiple columns on MLTransform * Add todo about what to do when new process handler is introduced * Add abstractmethod decorator * Edit Error message * Update docs, error messages * Remove record batch input/output arg * Modify generic types * Fix import sort * Fix mypy errors - best effort * Fix tests * Add TFTOperation doc * Rename tft_transform to tft * Fix hadler_test * Fix base_test * Fix pydocs
* Initial work on MLTransform and ProcessHandler * Support for containers: List, Dict[str, np.ndarray] pass types Support Pyarrow schema Artifact WIP * Add min, max, artifacts for scale_0_to_1 * Add more transform functions and artifacts WIP on inferring types Remove pyarrow implementation Add MLTransformOutput Refactor files * Add generic type annotations * Add unit tests Fix artifacts code Add more tests fix lint erors Change namespaces from ml_transform to transforms Add doc strings Add tests and refactor * Add support for saving intermediate results for a transform Sort imports Add metrics namespaces Refactor * Add schema to the output PCollection * Remove MLTransformOutput and return Row instead with schema * Convert primitive type to list using a DoFn. Remove FixedLenFeatureSpec Make VarLenFeatureSpec as default Refactoring * Add append_transform to the ProcessHandler Some more refactoring * Remove param self.has_artifacts, add artifact_location to handler..and address PR comments Add skip conditions for tests Add test suite for tft tests * Move tensorflow import into the try except catch Try except in __init__.py Remove imports from __init__ Add docstrings, refactor * Add type annotations for the data transforms * Add tft test in tox.ini Mock tensorflow_transform in pydocs fix tft pypi name Skip a test Add step name Update supported versions of TFT * Add step name for TFTProcessHandler * Remove unsupported tft versions * Fix mypy * Refactor TFTProcessHandlerDict to TFTProcessHandlerSchema * Update doc for data processing transforms * Fix checking the typing container types * Refactor code * Fail TFTProcessHandler on a non-global window PColl * Remove underscore * Remove high level functions * Add TFIDF * Fix tests with new changes[WIP] * Fix tests * Refactor class name to CamelCase and remove kwrags * use is_default instead of isinstance * Remove falling back to staging location for artifact location * Add TFIDF tests * Remove __str__ * Refactor skip statement * Add utils for fetching artifacts on compute and apply vocab * Make ProcessHandler internal class * Only run analyze stage when transform_fn(artifacts) is not computed before. * Fail if pipeline has non default window during artifact producing stage * Add support for Dict, recordbatch and introduce artifact_mode * Hide process_handler from user. Make TFTProcessHandler as default * Refactor few tests * Comment a test * Save raw_data_meta_data so that it can be used during consume stage * Refactor code * Add test on artifacts * Fix imports * Add tensorflow_metadata to pydocs * Fix test * Add TFIDF to import * Add basic example * Remove redundant logging statements * Add test for multiple columns on MLTransform * Add todo about what to do when new process handler is introduced * Add abstractmethod decorator * Edit Error message * Update docs, error messages * Remove record batch input/output arg * Modify generic types * Fix import sort * Fix mypy errors - best effort * Fix tests * Add TFTOperation doc * Rename tft_transform to tft * Fix hadler_test * Fix base_test * Fix pydocs
Design doc: https://docs.google.com/document/d/1rQkSm_8tseLqDQaLohtlCGqt5pvMaP0XIpPi5UD0LCQ/edit
This PR introduces TFTProcessHandlerSchema which takes a pcoll and perform user provided data processing transforms. TFTProcessHandler uses tensorflow_transform's beam implementation for computing results.
This PR aims to provide a simple to use API which abstracts the user away from the complexities that one can find with using tensorflow transform such as providing a tensorflow feature spec etc.
Not considered/implemented in this PR
Alternative design - Alternative way to pass columns to the transforms
Instead of passing columns to the every data transform, we can let MLTransform take a param
columns
and assume that all the data transforms within the scope of that MLTransform are applicable only on the inputcolumns
passed.But this could have a downside if the user calls multple
MLTransform
s, then there will be fusion break(due to tft internal implementation)Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.