Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recipe and recipe execution #1117

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/source/api/recipe/index.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
.. _Recipe:

Execution
=========

.. autoclass:: starfish.recipe.Execution
:members:

Recipe
======

.. autoclass:: starfish.recipe.Recipe
:members:

Runnable
========

Expand Down
1 change: 1 addition & 0 deletions starfish/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
RunInsufficientParametersError,
TypeInferenceError,
)
from .recipe import Execution, Recipe
from .runnable import Runnable
164 changes: 164 additions & 0 deletions starfish/recipe/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from typing import (
AbstractSet,
Any,
Mapping,
MutableMapping,
MutableSequence,
Optional,
Sequence,
Set,
)

from starfish.types import Axes, Coordinates
from .filesystem import FileProvider, FileTypes
from .runnable import Runnable


class ExecutionComplete(Exception):
"""Raised by :py:class:`Execution` when it is complete. We don't rely on catching StopIteration
because some underlying library may have raised that instead."""
pass


class Execution:
"""Encompasses the state of a single execution of a recipe."""
def __init__(
self,
runnable_sequence: Sequence[Runnable],
output_runnables: Sequence[Runnable],
output_paths: Sequence[str],
) -> None:
self._runnable_sequence = iter(runnable_sequence)
self._output_runnables = output_runnables
self._output_paths = output_paths

# build a map between each runnable and its dependents. each time a runnable completes, we
# go through each of its dependencies to see if its results are still needed.
runnable_dependents: MutableMapping[Runnable, Set[Runnable]] = dict()
for output_runnable in self._output_runnables:
Execution.build_graph(output_runnable, runnable_dependents)
self.runnable_dependents: Mapping[Runnable, AbstractSet[Runnable]] = runnable_dependents

# completed results
self._completed_runnables: Set[Runnable] = set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply that you could never have duplicate runnables?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, even runnables with the same parameters are treated as distinct entities.

self._completed_results: MutableMapping[Runnable, Any] = dict()

def run_one_tick(self) -> None:
"""Run one tick of the execution graph. Raises StopIteration if it's done."""
try:
runnable = next(self._runnable_sequence)
except StopIteration as ex:
raise ExecutionComplete from ex

result = runnable.run(self._completed_results)

# record what's been done.
self._completed_runnables.add(runnable)
self._completed_results[runnable] = result

# examine all the dependencies, and discard the results if no one else needs it.
for dependency in runnable.runnable_dependencies:
if dependency in self._output_runnables:
# it's required by the outputs, so preserve this.
continue

for dependent in self.runnable_dependents[dependency]:
if dependent not in self._completed_runnables:
# someone still needs this runnable's result.
break
else:
# every dependent is complete. drop the result.
del self._completed_results[dependency]

def save(self) -> None:
for runnable, output_path in zip(self._output_runnables, self._output_paths):
# get the result
result = self._completed_results[runnable]

filetype = FileTypes.resolve_by_instance(result)
filetype.save(result, output_path)

def run_and_save(self) -> None:
while True:
try:
self.run_one_tick()
except ExecutionComplete:
break

self.save()

@staticmethod
def build_graph(
runnable: Runnable,
runnable_dependents: MutableMapping[Runnable, Set[Runnable]],
seen_runnables: Optional[Set[Runnable]]=None,
) -> None:
if seen_runnables is None:
seen_runnables = set()

if runnable in seen_runnables:
return
seen_runnables.add(runnable)

for dependency in runnable.runnable_dependencies:
# mark ourselves a dependent of each of our dependencies.
if dependency not in runnable_dependents:
runnable_dependents[dependency] = set()
runnable_dependents[dependency].add(runnable)
Execution.build_graph(dependency, runnable_dependents, seen_runnables)


class OrderedSequence:
def __init__(self) -> None:
self._sequence: MutableSequence[Runnable] = list()

def __call__(self, *args, **kwargs):
result = Runnable(*args, **kwargs)
self._sequence.append(result)
return result

@property
def sequence(self) -> Sequence[Runnable]:
return self._sequence


class Recipe:
def __init__(
self,
recipe_str: str,
input_paths_or_urls: Sequence[str],
output_paths: Sequence[str],
):
ordered_sequence = OrderedSequence()
file_outputs: MutableMapping[int, Runnable] = {}
recipe_scope = {
"Axes": Axes,
"Coordinates": Coordinates,
"file_inputs": [
FileProvider(input_path_or_url)
for input_path_or_url in input_paths_or_urls
],
"compute": ordered_sequence,
"file_outputs": file_outputs,
}
ast = compile(recipe_str, "<string>", "exec")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does ast stand for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstract syntax tree.

exec(ast, recipe_scope)

assert len(file_outputs) == len(output_paths), \
"Recipe generates more outputs than output paths provided!"

# verify that the outputs are sequential.
ordered_outputs: MutableSequence[Runnable] = list()
for ix in range(len(file_outputs)):
assert ix in file_outputs, \
f"file_outputs[{ix}] is not set"
assert isinstance(file_outputs[ix], Runnable), \
f"file_outputs[{ix}] is not the result of a compute(..)"
ordered_outputs.append(file_outputs[ix])

self._runnable_order = ordered_sequence.sequence
self._outputs: Sequence[Runnable] = ordered_outputs
self._output_paths = output_paths
shanaxel42 marked this conversation as resolved.
Show resolved Hide resolved

def execution(self) -> Execution:
return Execution(self._runnable_order, self._outputs, self._output_paths)
147 changes: 147 additions & 0 deletions starfish/recipe/test/test_recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import os
import tempfile
import warnings

import numpy as np
import pytest

from starfish import ImageStack
from starfish.recipe import ConstructorError, ConstructorExtraParameterWarning
from starfish.recipe.recipe import Recipe
from . import fakefilter # noqa: F401


BASE_EXPECTED = np.array([
[0.227543, 0.223117, 0.217014, 0.221241, 0.212863, 0.211963, 0.210575,
0.198611, 0.194827, 0.181964],
[0.216617, 0.214710, 0.212467, 0.218158, 0.211429, 0.210361, 0.205737,
0.190814, 0.182010, 0.165667],
[0.206744, 0.204685, 0.208774, 0.212909, 0.215274, 0.206180, 0.196674,
0.179080, 0.169207, 0.157549],
[0.190845, 0.197131, 0.188540, 0.195361, 0.196765, 0.200153, 0.183627,
0.167590, 0.159930, 0.150805],
[0.181231, 0.187457, 0.182910, 0.179416, 0.175357, 0.172137, 0.165072,
0.156344, 0.153735, 0.150378],
[0.169924, 0.184604, 0.182422, 0.174441, 0.159823, 0.157229, 0.157259,
0.151690, 0.147265, 0.139940],
[0.164874, 0.169467, 0.178012, 0.173129, 0.161425, 0.155978, 0.152712,
0.150286, 0.145159, 0.140658],
[0.164508, 0.165042, 0.171420, 0.174990, 0.162951, 0.152422, 0.149325,
0.151675, 0.141588, 0.139010],
[0.162448, 0.156451, 0.158419, 0.162722, 0.160388, 0.152865, 0.142885,
0.142123, 0.140093, 0.135836],
[0.150072, 0.147295, 0.145495, 0.153216, 0.156085, 0.149981, 0.145571,
0.141878, 0.138857, 0.136965]],
dtype=np.float32)
URL = "https://d2nhj9g34unfro.cloudfront.net/20181005/ISS-TEST/fov_001/hybridization.json"


def test_simple_recipe():
"""Test that a simple recipe can execute correctly."""
recipe_str = """
file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=0.5)
"""

with tempfile.TemporaryDirectory() as tempdir:
output_path = os.path.join(tempdir, "output.json")
recipe = Recipe(recipe_str, [URL], [output_path])

execution = recipe.execution()
execution.run_and_save()

result_stack = ImageStack.from_path_or_url(output_path)
assert np.allclose(
BASE_EXPECTED * .5,
result_stack.xarray[2, 2, 0, 40:50, 40:50]
)


def test_chained_recipe():
"""Test that a recipe with a complex graph can execute correctly."""
recipe_str = """
a = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=0.5)
b = compute("filter", "SimpleFilterAlgorithm", a, multiplicand=.3)
file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", b, multiplicand=0.2)
c = compute("filter", "SimpleFilterAlgorithm", a, multiplicand=.2)
file_outputs[1] = compute("filter", "SimpleFilterAlgorithm", c, multiplicand=.3)
"""

with tempfile.TemporaryDirectory() as tempdir:
output_0_path = os.path.join(tempdir, "output_0.json")
output_1_path = os.path.join(tempdir, "output_1.json")
recipe = Recipe(recipe_str, [URL], [output_0_path, output_1_path])

execution = recipe.execution()
execution.run_and_save()

for path in (output_0_path, output_1_path):
result_stack = ImageStack.from_path_or_url(path)
assert np.allclose(
BASE_EXPECTED * .03,
result_stack.xarray[2, 2, 0, 40:50, 40:50]
)


def test_garbage_collection():
"""Test that recipe execution discards intermediate results that are no longer necessary."""
recipe_str = """
a = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=0.5)
b = compute("filter", "SimpleFilterAlgorithm", a, multiplicand=0.3)
c = compute("filter", "SimpleFilterAlgorithm", b, multiplicand=4.)
d = compute("filter", "SimpleFilterAlgorithm", c, multiplicand=0.5)
file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", d, multiplicand=0.5)
"""

with tempfile.TemporaryDirectory() as tempdir:
output_path = os.path.join(tempdir, "output.json")
recipe = Recipe(recipe_str, [URL], [output_path])

execution = recipe.execution()
execution.run_one_tick()
execution.run_one_tick()

assert len(execution._completed_results) == 1

execution.run_and_save()

result_stack = ImageStack.from_path_or_url(output_path)
assert np.allclose(
BASE_EXPECTED * .15,
result_stack.xarray[2, 2, 0, 40:50, 40:50]
)


def test_recipe_constructor_missing_args():
"""Test that recipe construction detects missing arguments to the constructor."""
recipe_str = """
file_output[0] = compute("filter", "SimpleFilterAlgorithm", file_inputs[0])
"""

with tempfile.TemporaryDirectory() as tempdir:
output_path = os.path.join(tempdir, "output.json")
with pytest.raises(ConstructorError):
Recipe(recipe_str, [URL], [output_path])


def test_recipe_constructor_extra_args():
"""Test that recipe construction detects missing arguments to the constructor."""
recipe_str = """
file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=.5, x=1)
"""

with tempfile.TemporaryDirectory() as tempdir:
output_path = os.path.join(tempdir, "output.json")
with warnings.catch_warnings(record=True) as w:
recipe = Recipe(recipe_str, [URL], [output_path])

assert len(w) == 1
assert issubclass(w[-1].category, ConstructorExtraParameterWarning)

execution = recipe.execution()
execution.run_and_save()

result_stack = ImageStack.from_path_or_url(output_path)
assert np.allclose(
BASE_EXPECTED * .5,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing suggestion:

Change the multiplicands to 0.5, 2.0, 4.0, and the expected to BASE_EXPECTED * 4; as the test stands, it would pass if only the first compute call executed for some reason.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did something similar to what you suggested.

result_stack.xarray[2, 2, 0, 40:50, 40:50]
)