Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Pass non HLG objects wout materialization #10369

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jun 22, 2023

Sibling and description dask/distributed#7942

@fjetter fjetter force-pushed the dask_graph_protocol_wout_materialization branch from a95eeef to 3b17e05 Compare September 6, 2023 14:40
@fjetter fjetter changed the title WIP: Pass non HLG objects wout materialization [WIP] Pass non HLG objects wout materialization Sep 8, 2023
dask/typing.py Outdated
Comment on lines 161 to 162
@runtime_checkable
class NewDaskCollection(Protocol):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently still experimenting. For now, I'm introducing a new collections protocol and am implementing branches for either collection. This causes a bit of duplication but feels nicer than something hybrid.

I will clean this up once we converge on a final API

dask/typing.py Outdated
Comment on lines 124 to 119
@abc.abstractmethod
def optimize(self) -> DaskGraph:
raise NotImplementedError("Inheriting class must implement this method.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on a conversation with @phofl :

distributed currently must know the keys to futures before graph transmission to the cluster. this is currently mostly an implementation detail and not a conceptual problem. However, removing this requirement would be a bit of work.

This requirement means for us that (for now) we have to optimize the expression before it is submitted to the scheduler since optimizations can alter final key output names. This is why the graph has a dedicated method for optimize and the recommendation is to split optimization and materialization apart.

dask/typing.py Outdated
defined by the :py:class:`~dask.typing.HLGDaskCollection`
protocol.
@runtime_checkable
class DaskGraph(Protocol):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Graphs can (and likely will) look more complicated than this protocol. However, this is the minimal protocol that is required to submit things to distributed using pickle

@fjetter fjetter force-pushed the dask_graph_protocol_wout_materialization branch from 32f6c6d to d520295 Compare December 20, 2023 15:05
Comment on lines +402 to +441
def collections_to_dsk(
collections, optimize_graph=True, optimizations=(), **kwargs
) -> TaskGraphFactory:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here is still pretty ugly since it requires boilerplate all over for compat purposes.

It probably doesn't make sense to have a switch inside of this function. I'd likely define a function that only works for newstyle and oldstyle collections since we'll need another switch outside as well.

# Merge all graphs
if any(isinstance(graph, HighLevelGraph) for graph in graphs):
dsk = HighLevelGraph.merge(*graphs)
return TaskFactoryHLGWrapper(dsk, ext_keys)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I defined this wrapper just to streamline stuff in distributed. As pointed out in dask/distributed#7942 (comment) we need more than just the graph, we also need the output keys to actually know what to compute

dask/base.py Outdated
Comment on lines 656 to 713
collections = [c.finalize_compute() for c in collections]
graph_factory = collections_to_dsk(collections, optimize_graph, **kwargs)
dsk = graph_factory.materialize()
keys = graph_factory.__dask_output_keys__()
return schedule(dsk, keys, **kwargs)
else:
graph_factory = collections_to_dsk(collections, optimize_graph, **kwargs)
from dask.highlevelgraph import TaskFactoryHLGWrapper

dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
keys, postcomputes = [], []
for x in collections:
keys.append(x.__dask_keys__())
postcomputes.append(x.__dask_postcompute__())
assert isinstance(graph_factory, TaskFactoryHLGWrapper)
dsk = graph_factory.materialize()
keys, postcomputes = [], []
for x in collections:
keys.append(x.__dask_keys__())
postcomputes.append(x.__dask_postcompute__())

with shorten_traceback():
results = schedule(dsk, keys, **kwargs)
with shorten_traceback():
results = schedule(dsk, keys, **kwargs)

return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way postcompute actions are defined so far doesn't fit well with the expr/graph factory model since the way it is currently defined, we extract the graph from the collection and mutate it using closures returned by the collection.
I personally consider this indirection very hard to understand and it also doesn't work well since I consider this an abstraction leak since it's a little weird for us having to mutate the graph to compute it. Instead, we tell the collection what it is supposed to be used for (finalize_compute) first, then we extract the graph factory, then we generate the graph, then we compute the graph.

I assume it's possible to reduce this boilerplate compat code a little but I haven't put much thought in it, yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the distributed version is a little more complex (new API much simpler), see https://github.com/dask/distributed/pull/7942/files#r1432859321

@fjetter fjetter force-pushed the dask_graph_protocol_wout_materialization branch from 3fe2d42 to 4ba504a Compare January 16, 2024 12:17
Copy link
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

     15 files  ±0       15 suites  ±0   3h 32m 36s ⏱️ - 9m 55s
 12 941 tests ±0   11 475 ✅  -   115     930 💤 +1    536 ❌ +  114 
160 329 runs  ±0  141 683 ✅  - 1 337  16 895 💤 +8  1 751 ❌ +1 329 

For more details on these failures, see this check.

Results for commit d45da8e. ± Comparison against base commit ca42dcf.

This pull request removes 2 and adds 2 tests. Note that renamed tests count towards both.
dask.tests.test_typing ‑ test_isinstance_core[DaskCollection]
dask.tests.test_typing ‑ test_isinstance_core[HLGDaskCollection]
dask.tests.test_typing ‑ test_isinstance_core[_DaskCollection]
dask.tests.test_typing ‑ test_isinstance_core[_HLGDaskCollection]
This pull request skips 1 test.
dask.dataframe.tests.test_multi ‑ test_concat_unknown_divisions

@fjetter
Copy link
Member Author

fjetter commented Jan 16, 2024

xref #10808

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant