-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-14038] Auto-startup for Python expansion service. #17035
Conversation
R: @ihji |
Codecov Report
@@ Coverage Diff @@
## master #17035 +/- ##
==========================================
+ Coverage 73.81% 73.89% +0.07%
==========================================
Files 667 667
Lines 87340 88017 +677
==========================================
+ Hits 64474 65039 +565
- Misses 21763 21875 +112
Partials 1103 1103
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
b2e8bff
to
b6f214e
Compare
b6f214e
to
bc7db3e
Compare
Failure in org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful |
Run Java PreCommit |
Run CommunityMetrics PreCommit |
Flake due to BEAM-8101 |
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good.
.withMultiOutputs(); | ||
PCollectionTuple outputs; | ||
if (input instanceof PCollection) { | ||
outputs = (PCollectionTuple) ((PCollection) input).apply(transform); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this explicit casting to (PCollectionTuple)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, unfortunately. It seems java struggles with "paritally-raw" types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I was able to work around this by using PCollection<?>
.
List<String> bootstrapCommand = ImmutableList.of("python", bootstrapScript.getAbsolutePath()); | ||
LOG.info("Running bootstrap command " + bootstrapCommand); | ||
Process bootstrap = | ||
new ProcessBuilder("python", bootstrapScript.getAbsolutePath()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it simpler to run bootstrap_beam_venv.py
not by creating a temporary copy but just passing the script text resource to python executable via stdin pipe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, I actually tried to do it this way originally. The problem is that the os.execlp
bootstrap from Python 2 to Python 3 doesn't work if it isn't run as a file (as the stdin is already partially consumed). There are still too many systems out there that, despite having Python 3, map python
to Python 2.x. (An alternative would be to make the full script Python 2 compatible, but that would cause ongoing pain, especially if one can't count on any dependencies like six...)
parser.add_argument('--python_version', help="Python major version.") | ||
parser.add_argument('--beam_version', | ||
help="Beam version.", | ||
default="2.36.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to avoid hard-coded Beam version? Do we need to update this version string every time we release a new version of Beam?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider making this a required argument. (Note that it's not an error to mix and match Beam versions, as long as what you're counting on is available, though it's generally best to have the latest.
Schema.Field.of("constructor", Schema.FieldType.STRING), | ||
Schema.Field.of("args", Schema.FieldType.row(args.getSchema())), | ||
Schema.Field.of("kwargs", Schema.FieldType.row(kwargs.getSchema()))); | ||
payloadSchema.setUUID(UUID.randomUUID()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity, why do we need to set UUID for the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python has a schema cache (e.g. so it's not constantly re-creating types for a dynamic schema). Results in some very surprising behavior if this is not set.
1326808
to
731b9a3
Compare
731b9a3
to
049408c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; | ||
|
||
/** Wrapper for invoking external Python transforms. */ | ||
public class ExternalPythonTransform<InputT extends PInput, OutputT extends POutput> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want this to be the main API for Java cross-language transforms, right ? So probably we should advertise in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See below.
private final Row args; | ||
private final Row kwargs; | ||
|
||
public ExternalPythonTransform(String fullyQualifiedName, Row args, Row kwargs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding following that wraps this transform:
External.of (String transformName, Map parameters, String endpoint);
And may be deprecate existing:
External.of(String urn, byte[] payload, String endpoint)
(this can be a separate PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we need a nicer API, which should probably have a 1-pager and be a separate PR. (To seed this, we should have a RowBuilder that starts with the empty row and can be built with withField(name, value)
with several overloads for different value types that builds up the schema dynamically. I don't know that a Map<String, Object> parameters is the most java-natural, but it could be allowed as an option as well.) I'm not convinced it makes sense to add a back-reference from External to ExternalPython.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a Row builder (on top of this) seems relatively straightforward. See #17101
I think it will be confusing to add another main External transform API (in addition to External.of and ExternalPythonTransform) so probably ExternalPythonTransform should be the main API to be used along with the Row builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExternalPythonTransform only makes sense for Python transforms, as it starts up a Python expansion service (rather than require one be provided) and expects a fqn + positional arg + kwargs rather than a urn + row (though one could argue row is basically kwargs).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, sorry, I meant "main API for using Python transforms from Java".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a short doc that gives an updated API: https://docs.google.com/document/d/164rn4otqtQA-8QoLlaoGunUDjLSBBvS2DuR0bn344MQ/edit?usp=sharing
.apply( | ||
new ExternalPythonTransform< | ||
PCollection<KV<String, String>>, PCollection<KV<String, Iterable<String>>>>( | ||
"apache_beam.GroupByKey", Row.nullRow(Schema.of()), Row.nullRow(Schema.of()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a more comprehensive set of unit tests here that uses DirectRunner ? Also "apache_beam.GroupByKey" is not a good example since it doesn't involve the SDK during execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java DirectRunner can't run cross-language pipelines. I used GBK because I wanted something simple and obvious--eventually we'll hopefully have a richer set of transforms to invoke.
|
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Failure in org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOElasticTest.classMethod |
ExternalPythonTransformTest is extremely flaky, for example: https://ci-beam.apache.org/job/beam_PreCommit_Java_Phrase/4806/testReport/junit/org.apache.beam.sdk.extensions.python/ExternalPythonTransformTest/trivialPythonTransform/ Filed https://issues.apache.org/jira/browse/BEAM-14148 Started rollback in #17154 |
…)" This reverts commit 0510cec.
…e. (apache#17035)"" This reverts commit f5d62dd.
This adds a script and some java code for automatically starting a Beam Python service.
It may be useful for Go, Typescript, etc. as well.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.