Skip to content

Commit

Permalink
Use Taskspec fuse implementation (#1162)
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Hoefler <[email protected]>
  • Loading branch information
fjetter and phofl authored Nov 13, 2024
1 parent 4cb1320 commit bb0aef0
Showing 1 changed file with 2 additions and 17 deletions.
19 changes: 2 additions & 17 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3766,31 +3766,16 @@ def _broadcast_dep(self, dep: Expr):

def _task(self, name: Key, index: int) -> Task:
internal_tasks = []
seen_keys = set()
external_deps = set()
for _expr in self.exprs:
if self._broadcast_dep(_expr):
subname = (_expr._name, 0)
else:
subname = (_expr._name, index)
t = _expr._task(subname, subname[1])

assert t.key == subname
internal_tasks.append(t)
seen_keys.add(subname)
external_deps.update(t.dependencies)
external_deps -= seen_keys
dependencies = {dep: TaskRef(dep) for dep in external_deps}
t = Task(
name,
Fused._execute_internal_graph,
# Wrap the actual subgraph as a data node such that the tasks are
# not erroneously parsed. The external task would otherwise carry
# the internal keys as dependencies which is not satisfiable
DataNode(None, internal_tasks),
dependencies,
(self.exprs[0]._name, index),
)
return t
return Task.fuse(*internal_tasks, key=name)

@staticmethod
def _execute_internal_graph(internal_tasks, dependencies, outkey):
Expand Down

0 comments on commit bb0aef0

Please sign in to comment.