Skip to content

Commit

Permalink
WIP/FIX(PIN): PARALLEL DELs decide on PRUNED-dag (not full)...
Browse files Browse the repository at this point in the history
- WIP: x4 TCs FAIL and still not discovered th bug :-(
+ BUT ALL+AUGMENTED PARALLEL TCs pass
  (#26 were failing some)
+ refact: net stores also `pruned_dag` (not only `steps`).
+ refact: _solve_dag() --> _prune_dag().
+ doc: +a lot.
+ TODO: store pruned_dag in own ExePlan class.
  • Loading branch information
ankostis committed Oct 4, 2019
1 parent 1cc733e commit d403783
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 78 deletions.
153 changes: 86 additions & 67 deletions graphkit/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,54 +54,65 @@ def __repr__(self):

class Network(object):
"""
Assemble operations & data into a directed-acyclic-graph (DAG) and run them
Assemble operations & data into a directed-acyclic-graph (DAG) to run them.
based on the given input values and requested outputs.
The execution of the contained *operations* in the dag (the computation)
is splitted in 2 phases:
The execution of *operations* (a computation) is splitted in 2 phases:
- COMPILE: prune, sort topologically the nodes in the dag, solve it, and
- COMPILE: prune unsatisfied nodes, sort dag topologically & solve it, and
derive the *execution plan* (see below) based on the given *inputs*
and asked *outputs*.
- EXECUTE: sequential or parallel invocation of the underlying functions
of the operations.
is based on 4 data-structures:
- the ``networkx`` :attr:`graph` DAG, containing interchanging layers of
:class:`Operation` and :class:`DataPlaceholderNode` nodes.
They are layed out and connected by repeated calls of :meth:`add_OP`.
The computation starts with :meth:`_solve_dag()` extracting
a *DAG subgraph* by *pruning* nodes based on given inputs and
requested outputs.
This subgraph is used to decide the `execution_plan` (see below), and
and is cached in :attr:`_cached_execution_plans` across runs with
inputs/outputs as key.
- the :attr:`execution_plan` is the list of the operation-nodes only
from the dag (above), topologically sorted, and interspersed with
*instructions steps* needed to complete the run.
It is built by :meth:`_build_execution_plan()` based on the subgraph dag
extracted above. The *instructions* items achieve the following:
- :class:`DeleteInstruction`: delete items from values-cache as soon as
they are not needed further down the dag, to reduce memory footprint
while computing.
- :class:`PinInstruction`: avoid overwritting any given intermediate
inputs, and still allow their providing operations to run
(because they are needed for their other outputs).
- the :var:`cache` local-var in :meth:`compute()`, initialized on each run
to hold the values of the given inputs, generated (aka intermediate) data,
and output values.
- the :var:`overwrites` local-var, initialized on each run of both
``_compute_xxx`` methods (for parallel or sequential executions), to
hold values calculated but overwritten (aka "pinned") by intermediate
input-values.
of the operations with arguments from the ``cache``.
is based on 5 data-structures:
:ivar graph:
A ``networkx`` DAG containing interchanging layers of
:class:`Operation` and :class:`DataPlaceholderNode` nodes.
They are layed out and connected by repeated calls of :meth:`add_OP`.
The computation starts with :meth:`_prune_dag()` extracting
a *DAG subgraph* by *pruning* its nodes based on given inputs and
requested outputs in :meth:`compute()`.
:ivar execution_dag:
It contains the nodes of the *pruned dag* from the last call to
:meth:`compile()`. This pruned subgraph is used to decide
the :attr:`execution_plan` (below).
It is cached in :attr:`_cached_compilations` across runs with
inputs/outputs as key.
:ivar execution_plan:
It is the list of the operation-nodes only
from the dag (above), topologically sorted, and interspersed with
*instructions steps* needed to complete the run.
It is built by :meth:`_build_execution_plan()` based on the subgraph dag
extracted above.
It is cached in :attr:`_cached_compilations` across runs with
inputs/outputs as key.
The *instructions* items achieve the following:
- :class:`DeleteInstruction`: delete items from values-cache as soon as
they are not needed further down the dag, to reduce memory footprint
while computing.
- :class:`PinInstruction`: avoid overwritting any given intermediate
inputs, and still allow their providing operations to run
(because they are needed for their other outputs).
:var cache:
a local-var in :meth:`compute()`, initialized on each run
to hold the values of the given inputs, generated (intermediate) data,
and output values.
It is returned as is if no specific outputs requested; no data-eviction
happens then.
:arg overwrites:
The optional argument given to :meth:`compute()` to colect the
intermediate *calculated* values that are overwritten by intermediate
(aka "pinned") input-values.
"""

Expand All @@ -119,11 +130,14 @@ def __init__(self, **kwargs):
#: The list of operation-nodes & *instructions* needed to evaluate
#: the given inputs & asked outputs, free memory and avoid overwritting
#: any given intermediate inputs.
self.execution_plan = []
self.execution_plan = ()

#: Pruned graph of the last compilation.
self.execution_dag = ()

#: Speed up :meth:`compile()` call and avoid a multithreading issue(?)
#: that is occuring when accessing the dag in networkx.
self._cached_execution_plans = {}
self._cached_compilations = {}


def add_op(self, operation):
Expand All @@ -143,8 +157,9 @@ def add_op(self, operation):
# assert layer is only added once to graph
assert operation not in self.graph.nodes, "Operation may only be added once"

## Invalidate old plans.
self._cached_execution_plans = {}
self.execution_dag = None
self.execution_plan = None
self._cached_compilations = {}

# add nodes and edges to graph describing the data needs for this layer
for n in operation.needs:
Expand Down Expand Up @@ -246,11 +261,11 @@ def _collect_unsatisfied_operations(self, dag, inputs):
all its needs have been accounted, so we can get its satisfaction.
- Their provided outputs are not linked to any data in the dag.
An operation might not have any output link when :meth:`_solve_dag()`
An operation might not have any output link when :meth:`_prune_dag()`
has broken them, due to given intermediate inputs.
:param dag:
the graph to consider
a graph with broken edges those arriving to existing inputs
:param inputs:
an iterable of the names of the input values
return:
Expand Down Expand Up @@ -288,13 +303,12 @@ def _collect_unsatisfied_operations(self, dag, inputs):
return unsatisfied


def _solve_dag(self, outputs, inputs):
def _prune_dag(self, outputs, inputs):
"""
Determines what graph steps need to run to get to the requested
outputs from the provided inputs. Eliminates steps that come before
(in topological order) any inputs that have been provided. Also
eliminates steps that are not on a path from the provided inputs to
the requested outputs.
outputs from the provided inputs. :
- Eliminate steps that are not on a path arriving to requested outputs.
- Eliminate unsatisfied operations: partial inputs or no outputs needed.
:param iterable outputs:
A list of desired output names. This can also be ``None``, in which
Expand All @@ -305,7 +319,7 @@ def _solve_dag(self, outputs, inputs):
The inputs names of all given inputs.
:return:
the *execution plan*
the *pruned_dag*
"""
dag = self.graph

Expand Down Expand Up @@ -341,18 +355,16 @@ def _solve_dag(self, outputs, inputs):

# Prune unsatisfied operations (those with partial inputs or no outputs).
unsatisfied = self._collect_unsatisfied_operations(broken_dag, inputs)
pruned_dag = dag.subgraph(broken_dag.nodes - unsatisfied)
pruned_dag = dag.subgraph(self.graph.nodes - unsatisfied)

plan = self._build_execution_plan(pruned_dag, inputs, outputs)

return plan
return pruned_dag.copy() # clone so that it is picklable


def compile(self, outputs=(), inputs=()):
"""
Solve dag, set the :attr:`execution_plan`, and cache it.
See :meth:`_solve_dag()` for detailed description.
See :meth:`_prune_dag()` for detailed description.
:param iterable outputs:
A list of desired output names. This can also be ``None``, in which
Expand All @@ -368,12 +380,20 @@ def compile(self, outputs=(), inputs=()):
outputs = tuple(sorted(outputs))
inputs_keys = tuple(sorted(inputs))
cache_key = (inputs_keys, outputs)
if cache_key in self._cached_execution_plans:
self.execution_plan = self._cached_execution_plans[cache_key]

if cache_key in self._cached_compilations:
dag, plan = self._cached_compilations[cache_key]
else:
plan = self._solve_dag(outputs, inputs)
# save this result in a precomputed cache for future lookup
self.execution_plan = self._cached_execution_plans[cache_key] = plan
dag = self._prune_dag(outputs, inputs)
plan = self._build_execution_plan(dag, inputs, outputs)

# Cache compilation results to speed up future runs
# with different values (but same number of inputs/outputs).
self._cached_compilations[cache_key] = dag, plan

## TODO: Extract into Solution class
self.execution_dag = dag
self.execution_plan = plan



Expand Down Expand Up @@ -494,7 +514,6 @@ def _execute_thread_pool_barrier_method(
self._pin_data_in_cache(node, cache, inputs, overwrites)



# stop if no nodes left to schedule, exit out of the loop
if len(upnext) == 0:
break
Expand Down Expand Up @@ -636,7 +655,7 @@ def _can_schedule_operation(self, op, executed_nodes):
execution based on what has already been executed.
"""
# unordered, not iterated
dependencies = set(n for n in nx.ancestors(self.graph, op)
dependencies = set(n for n in nx.ancestors(self.execution_dag, op)
if isinstance(n, Operation))
return dependencies.issubset(executed_nodes)

Expand All @@ -654,7 +673,7 @@ def _can_evict_value(self, name, executed_nodes):
"""
data_node = self.get_data_node(name)
return data_node and set(
self.graph.successors(data_node)).issubset(executed_nodes)
self.execution_dag.successors(data_node)).issubset(executed_nodes)

def get_data_node(self, name):
"""
Expand Down
37 changes: 26 additions & 11 deletions test/test_graphkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@ def test_pruning_not_overrides_given_intermediate():
assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp
assert overwrites == {} # unjust must have been pruned

## Test Parallel
#
pipeline.set_execution_method("parallel")
overwrites = {}
pipeline.set_overwrites_collector(overwrites)
#assert pipeline({"a": 5, "overriden": 1, "c": 2}, ["asked"]) == filtdict(exp, "asked")
assert overwrites == {} # unjust must have been pruned

overwrites = {}
pipeline.set_overwrites_collector(overwrites)
assert pipeline({"a": 5, "overriden": 1, "c": 2}) == exp
assert overwrites == {} # unjust must have been pruned


def test_pruning_multiouts_not_override_intermediates1():
# Test #25: v.1.2.4 overwrites intermediate data when a previous operation
Expand Down Expand Up @@ -348,9 +361,9 @@ def test_pruning_with_given_intermediate_and_asked_out():
## Test parallel
# FAIL! in #26!
#
# pipeline.set_execution_method("parallel")
# assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp
# assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked")
pipeline.set_execution_method("parallel")
assert pipeline({"given-1": 5, "b": 2, "given-2": 2}) == exp
assert pipeline({"given-1": 5, "b": 2, "given-2": 2}, ["asked"]) == filtdict(exp, "asked")

def test_unsatisfied_operations():
# Test that operations with partial inputs are culled and not failing.
Expand Down Expand Up @@ -395,16 +408,17 @@ def test_unsatisfied_operations_same_out():
assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c")

## Test parallel
#
# FAIL! in #26
pipeline.set_execution_method("parallel")
exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21}
assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp
assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c")
#
# pipeline.set_execution_method("parallel")
# exp = {"a": 10, "b1": 2, "c": 1, "ab": 20, "ab_plus_c": 21}
# assert pipeline({"a": 10, "b1": 2, "c": 1}) == exp
# assert pipeline({"a": 10, "b1": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c")

# exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6}
# assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp
# assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c")
# FAIL! in #26
exp = {"a": 10, "b2": 2, "c": 1, "ab": 5, "ab_plus_c": 6}
assert pipeline({"a": 10, "b2": 2, "c": 1}) == exp
assert pipeline({"a": 10, "b2": 2, "c": 1}, outputs=["ab_plus_c"]) == filtdict(exp, "ab_plus_c")


def test_optional():
Expand Down Expand Up @@ -624,6 +638,7 @@ def compute(self, inputs):
outputs.append(p)
return outputs


def test_backwards_compatibility():

sum_op1 = Sum(
Expand Down

0 comments on commit d403783

Please sign in to comment.