-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable lazy pipeline mutations via recipe functions #168
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #168 +/- ##
==========================================
- Coverage 95.53% 95.35% -0.19%
==========================================
Files 14 14
Lines 493 495 +2
==========================================
+ Hits 471 472 +1
- Misses 22 23 +1 ☔ View full report in Codecov by Sentry. |
pangeo_forge_runner/commands/bake.py
Outdated
@@ -261,7 +262,7 @@ def start(self): | |||
|
|||
# if pangeo-forge-recipes is <=0.9, we have to specify a requirements.txt | |||
# file even if it isn't present, as the image used otherwise will not have pangeo-forge-recipes | |||
if isinstance(recipe, PTransform): | |||
if isinstance(recipe, PTransform) or inspect.isfunction(recipe): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use callable instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems totally doable - I wasn't actually familiar with inspect.isfunction
so perhaps there are implications here that I'm unfamiliar with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, callable
will appropriately check for anything with the callable protocol as well as callable classes while inspect.isfunction
will explicitly check just for functions. Since the goal here is to just call the recipe (rather than do something specific to functions, like look at their source), I think callable
is the right thing here.
PRs that discover things that are described with:
are my favorite kind of PRs! :) |
1645ce9
to
2d51ab5
Compare
Some feedback about this that we need to look into :
|
|
Here's a clear example of how important it can be to have multiple forking paths when defining beam pipelines: https://github.com/pangeo-forge/pangeo-forge-recipes/pull/689/files#diff-8bac120398898793cd4f9daf94551b1f3d3f1867bed8a68b14cceed49d6dc30fR503-R507 Now, I can't say with 100% certainty that this is the only way to get statistics which may be necessary for distributed work that requires global context but I can say that it is the only way I've found. If people have other ideas, I'd be very interested. If I'm right though, this feature is pretty much a must-have for us to enable complex recipes. In the above case, the issue doesn't come up because it happens in a slightly different context and the |
this is still FWML
for more information, see https://pre-commit.ci
a901846
to
98792bd
Compare
I've been thinking about this for a majority of the day today. First of all, @moradology thank you for surfacing such deep insights about pipeline mutations and their relationship to branching DAGs in Beam. This is hard-won understanding for our community, which we absolutely should leverage to inform a revision of how we approach pipeline construction. In terms of the callable proposal specifically: I'm wondering if we've gotten ourselves into the current predicament by trying to wrap Beam in some non-standard ways, and therefore maybe the best path forward is to try to get back to something that looks more like vanilla Beam, not less? This PR does an elegant job of finding a way out of our current bind while changing very little here in runner, but this change has a rather large implication for what recipes themselves will need to look like: the object that recipe developers hand off to runner becomes (optionally, I suppose) a callable that mutates a pipeline. From the recipe look and feel standpoint, this seems to take us if anything even a bit further away than we currently are from vanilla Beam. Which is long preamble to say: now that you've so clearly and helpfully identified this shortcoming of our current implementation, is this the right time to do something which has been discussed off and on since our beam work began; namely, should we make the change to have recipes be "just" a Beam script, with a requirement that they conclude with:
? This brings us back into the mainstream of what's recommended by Beam itself, with all the attendant benefits of that, among them support for branching pipelines. If we go this route, the scope of runner is narrowed to be just focused on injecting or otherwise providing the necessary PipelineOptions at script invocation time. |
It's sounding as though the alternative proposal here would be to construct What gives me some pause here is that One area that's not clear, assuming the recipe-as-function approach, is exactly what we should expect as arguments. I can imagine at least a few alternatives I'm likely biased here, as I generally am a fan of passing around functions and being as lazy (in the principled, execution-semantics sense!) as possible and I know that some find it simpler to approach software with procedural, eager-execution in mind. This makes me a bit hesitant to overstate my case given how central it is as a design decision although to my mind An example of how this deferral might enable more generality for recipes while keeping their specifics decoupled from the runner (a slight modification of this): def recipe(pipeline: beam.Pipeline, config: SomeConfig):
pipeline | beam.Create(config.pattern.items())
| OpenWithKerchunk(
remote_protocol=earthdata_protocol,
file_type=config.pattern.file_type,
inline_threshold=config.inline_threshold,
storage_options=config.auth_args,
)
| WriteCombinedReference(
concat_dims=config.CONCAT_DIMS,
identical_dims=config.IDENTICAL_DIMS,
store_name=config.SHORT_NAME
))
|
Recently (while trying to figure out why it is so hard to call
to_runner_api()
for DAG serialization) did a deep dive into howbeam
handles its pipelines and transformations. It is horrifying. So: every time you pipe to a transform what's actually happening is yourPipeline
instance is internally mutating the state of its job-graph. The frustrating thing is that our current strategy tries to take aPTransform
and apply it to a pipeline that's created later. This does not work as expected. Not at all.Here's something you can do in beam:
Unfortunately, this (which is currently how things work around here) does not work:
To make matters worse: those "unique names" (e.g "BRANCH1") are just lost. They're gone. The PTransform doesn't hang onto them. The
Pipeline
does. Through mutations. This is very confusing.This branch fixes that. Recipes can now be functions that take a pipeline and apply mutations. Mutations are still bad, but now we can use them as intended.
So: now we have unique names and we can do branching pipelines. Probably other things too!