From 8860ad43bf5e5b0863952a11a529b7f7a804657a Mon Sep 17 00:00:00 2001 From: Tony Tung Date: Wed, 10 Apr 2019 12:53:14 -0700 Subject: [PATCH] Recipe and recipe execution Builds on top of #1097 to construct and run recipes with one or more runnables. A recipe wires together one or more runnables. Inputs to recipes are available to the recipe itself as the magic variable name "file_inputs". A runnable is invoked by the magic function "compute". Anything written to the magic variable "file_outputs" is matched and written out to a set of output paths. Fixes #311 --- docs/source/api/recipe/index.rst | 12 ++ starfish/recipe/__init__.py | 1 + starfish/recipe/recipe.py | 164 ++++++++++++++++++++++++++++ starfish/recipe/test/test_recipe.py | 147 +++++++++++++++++++++++++ 4 files changed, 324 insertions(+) create mode 100644 starfish/recipe/recipe.py create mode 100644 starfish/recipe/test/test_recipe.py diff --git a/docs/source/api/recipe/index.rst b/docs/source/api/recipe/index.rst index 220f249d1..4a91ca8f0 100644 --- a/docs/source/api/recipe/index.rst +++ b/docs/source/api/recipe/index.rst @@ -1,5 +1,17 @@ .. _Recipe: +Execution +========= + +.. autoclass:: starfish.recipe.Execution + :members: + +Recipe +====== + +.. autoclass:: starfish.recipe.Recipe + :members: + Runnable ======== diff --git a/starfish/recipe/__init__.py b/starfish/recipe/__init__.py index b9761a6c2..6d5d81bc5 100644 --- a/starfish/recipe/__init__.py +++ b/starfish/recipe/__init__.py @@ -6,4 +6,5 @@ RunInsufficientParametersError, TypeInferenceError, ) +from .recipe import Execution, Recipe from .runnable import Runnable diff --git a/starfish/recipe/recipe.py b/starfish/recipe/recipe.py new file mode 100644 index 000000000..59fb88831 --- /dev/null +++ b/starfish/recipe/recipe.py @@ -0,0 +1,164 @@ +from typing import ( + AbstractSet, + Any, + Mapping, + MutableMapping, + MutableSequence, + Optional, + Sequence, + Set, +) + +from starfish.types import Axes, Coordinates +from .filesystem import FileProvider, FileTypes +from .runnable import Runnable + + +class ExecutionComplete(Exception): + """Raised by :py:class:`Execution` when it is complete. We don't rely on catching StopIteration + because some underlying library may have raised that instead.""" + pass + + +class Execution: + """Encompasses the state of a single execution of a recipe.""" + def __init__( + self, + runnable_sequence: Sequence[Runnable], + output_runnables: Sequence[Runnable], + output_paths: Sequence[str], + ) -> None: + self._runnable_sequence = iter(runnable_sequence) + self._output_runnables = output_runnables + self._output_paths = output_paths + + # build a map between each runnable and its dependents. each time a runnable completes, we + # go through each of its dependencies to see if its results are still needed. + runnable_dependents: MutableMapping[Runnable, Set[Runnable]] = dict() + for output_runnable in self._output_runnables: + Execution.build_graph(output_runnable, runnable_dependents) + self.runnable_dependents: Mapping[Runnable, AbstractSet[Runnable]] = runnable_dependents + + # completed results + self._completed_runnables: Set[Runnable] = set() + self._completed_results: MutableMapping[Runnable, Any] = dict() + + def run_one_tick(self) -> None: + """Run one tick of the execution graph. Raises StopIteration if it's done.""" + try: + runnable = next(self._runnable_sequence) + except StopIteration as ex: + raise ExecutionComplete from ex + + result = runnable.run(self._completed_results) + + # record what's been done. + self._completed_runnables.add(runnable) + self._completed_results[runnable] = result + + # examine all the dependencies, and discard the results if no one else needs it. + for dependency in runnable.runnable_dependencies: + if dependency in self._output_runnables: + # it's required by the outputs, so preserve this. + continue + + for dependent in self.runnable_dependents[dependency]: + if dependent not in self._completed_runnables: + # someone still needs this runnable's result. + break + else: + # every dependent is complete. drop the result. + del self._completed_results[dependency] + + def save(self) -> None: + for runnable, output_path in zip(self._output_runnables, self._output_paths): + # get the result + result = self._completed_results[runnable] + + filetype = FileTypes.resolve_by_instance(result) + filetype.save(result, output_path) + + def run_and_save(self) -> None: + while True: + try: + self.run_one_tick() + except ExecutionComplete: + break + + self.save() + + @staticmethod + def build_graph( + runnable: Runnable, + runnable_dependents: MutableMapping[Runnable, Set[Runnable]], + seen_runnables: Optional[Set[Runnable]]=None, + ) -> None: + if seen_runnables is None: + seen_runnables = set() + + if runnable in seen_runnables: + return + seen_runnables.add(runnable) + + for dependency in runnable.runnable_dependencies: + # mark ourselves a dependent of each of our dependencies. + if dependency not in runnable_dependents: + runnable_dependents[dependency] = set() + runnable_dependents[dependency].add(runnable) + Execution.build_graph(dependency, runnable_dependents, seen_runnables) + + +class OrderedSequence: + def __init__(self) -> None: + self._sequence: MutableSequence[Runnable] = list() + + def __call__(self, *args, **kwargs): + result = Runnable(*args, **kwargs) + self._sequence.append(result) + return result + + @property + def sequence(self) -> Sequence[Runnable]: + return self._sequence + + +class Recipe: + def __init__( + self, + recipe_str: str, + input_paths_or_urls: Sequence[str], + output_paths: Sequence[str], + ): + ordered_sequence = OrderedSequence() + file_outputs: MutableMapping[int, Runnable] = {} + recipe_scope = { + "Axes": Axes, + "Coordinates": Coordinates, + "file_inputs": [ + FileProvider(input_path_or_url) + for input_path_or_url in input_paths_or_urls + ], + "compute": ordered_sequence, + "file_outputs": file_outputs, + } + ast = compile(recipe_str, "", "exec") + exec(ast, recipe_scope) + + assert len(file_outputs) == len(output_paths), \ + "Recipe generates more outputs than output paths provided!" + + # verify that the outputs are sequential. + ordered_outputs: MutableSequence[Runnable] = list() + for ix in range(len(file_outputs)): + assert ix in file_outputs, \ + f"file_outputs[{ix}] is not set" + assert isinstance(file_outputs[ix], Runnable), \ + f"file_outputs[{ix}] is not the result of a compute(..)" + ordered_outputs.append(file_outputs[ix]) + + self._runnable_order = ordered_sequence.sequence + self._outputs: Sequence[Runnable] = ordered_outputs + self._output_paths = output_paths + + def execution(self) -> Execution: + return Execution(self._runnable_order, self._outputs, self._output_paths) diff --git a/starfish/recipe/test/test_recipe.py b/starfish/recipe/test/test_recipe.py new file mode 100644 index 000000000..828be546a --- /dev/null +++ b/starfish/recipe/test/test_recipe.py @@ -0,0 +1,147 @@ +import os +import tempfile +import warnings + +import numpy as np +import pytest + +from starfish import ImageStack +from starfish.recipe import ConstructorError, ConstructorExtraParameterWarning +from starfish.recipe.recipe import Recipe +from . import fakefilter # noqa: F401 + + +BASE_EXPECTED = np.array([ + [0.227543, 0.223117, 0.217014, 0.221241, 0.212863, 0.211963, 0.210575, + 0.198611, 0.194827, 0.181964], + [0.216617, 0.214710, 0.212467, 0.218158, 0.211429, 0.210361, 0.205737, + 0.190814, 0.182010, 0.165667], + [0.206744, 0.204685, 0.208774, 0.212909, 0.215274, 0.206180, 0.196674, + 0.179080, 0.169207, 0.157549], + [0.190845, 0.197131, 0.188540, 0.195361, 0.196765, 0.200153, 0.183627, + 0.167590, 0.159930, 0.150805], + [0.181231, 0.187457, 0.182910, 0.179416, 0.175357, 0.172137, 0.165072, + 0.156344, 0.153735, 0.150378], + [0.169924, 0.184604, 0.182422, 0.174441, 0.159823, 0.157229, 0.157259, + 0.151690, 0.147265, 0.139940], + [0.164874, 0.169467, 0.178012, 0.173129, 0.161425, 0.155978, 0.152712, + 0.150286, 0.145159, 0.140658], + [0.164508, 0.165042, 0.171420, 0.174990, 0.162951, 0.152422, 0.149325, + 0.151675, 0.141588, 0.139010], + [0.162448, 0.156451, 0.158419, 0.162722, 0.160388, 0.152865, 0.142885, + 0.142123, 0.140093, 0.135836], + [0.150072, 0.147295, 0.145495, 0.153216, 0.156085, 0.149981, 0.145571, + 0.141878, 0.138857, 0.136965]], + dtype=np.float32) +URL = "https://d2nhj9g34unfro.cloudfront.net/20181005/ISS-TEST/fov_001/hybridization.json" + + +def test_simple_recipe(): + """Test that a simple recipe can execute correctly.""" + recipe_str = """ +file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=0.5) + """ + + with tempfile.TemporaryDirectory() as tempdir: + output_path = os.path.join(tempdir, "output.json") + recipe = Recipe(recipe_str, [URL], [output_path]) + + execution = recipe.execution() + execution.run_and_save() + + result_stack = ImageStack.from_path_or_url(output_path) + assert np.allclose( + BASE_EXPECTED * .5, + result_stack.xarray[2, 2, 0, 40:50, 40:50] + ) + + +def test_chained_recipe(): + """Test that a recipe with a complex graph can execute correctly.""" + recipe_str = """ +a = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=0.5) +b = compute("filter", "SimpleFilterAlgorithm", a, multiplicand=.3) +file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", b, multiplicand=0.2) +c = compute("filter", "SimpleFilterAlgorithm", a, multiplicand=.2) +file_outputs[1] = compute("filter", "SimpleFilterAlgorithm", c, multiplicand=.3) + """ + + with tempfile.TemporaryDirectory() as tempdir: + output_0_path = os.path.join(tempdir, "output_0.json") + output_1_path = os.path.join(tempdir, "output_1.json") + recipe = Recipe(recipe_str, [URL], [output_0_path, output_1_path]) + + execution = recipe.execution() + execution.run_and_save() + + for path in (output_0_path, output_1_path): + result_stack = ImageStack.from_path_or_url(path) + assert np.allclose( + BASE_EXPECTED * .03, + result_stack.xarray[2, 2, 0, 40:50, 40:50] + ) + + +def test_garbage_collection(): + """Test that recipe execution discards intermediate results that are no longer necessary.""" + recipe_str = """ +a = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=0.5) +b = compute("filter", "SimpleFilterAlgorithm", a, multiplicand=0.3) +c = compute("filter", "SimpleFilterAlgorithm", b, multiplicand=4.) +d = compute("filter", "SimpleFilterAlgorithm", c, multiplicand=0.5) +file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", d, multiplicand=0.5) + """ + + with tempfile.TemporaryDirectory() as tempdir: + output_path = os.path.join(tempdir, "output.json") + recipe = Recipe(recipe_str, [URL], [output_path]) + + execution = recipe.execution() + execution.run_one_tick() + execution.run_one_tick() + + assert len(execution._completed_results) == 1 + + execution.run_and_save() + + result_stack = ImageStack.from_path_or_url(output_path) + assert np.allclose( + BASE_EXPECTED * .15, + result_stack.xarray[2, 2, 0, 40:50, 40:50] + ) + + +def test_recipe_constructor_missing_args(): + """Test that recipe construction detects missing arguments to the constructor.""" + recipe_str = """ +file_output[0] = compute("filter", "SimpleFilterAlgorithm", file_inputs[0]) +""" + + with tempfile.TemporaryDirectory() as tempdir: + output_path = os.path.join(tempdir, "output.json") + with pytest.raises(ConstructorError): + Recipe(recipe_str, [URL], [output_path]) + + +def test_recipe_constructor_extra_args(): + """Test that recipe construction detects missing arguments to the constructor.""" + recipe_str = """ +file_outputs[0] = compute("filter", "SimpleFilterAlgorithm", file_inputs[0], multiplicand=.5, x=1) +""" + + with tempfile.TemporaryDirectory() as tempdir: + output_path = os.path.join(tempdir, "output.json") + with warnings.catch_warnings(record=True) as w: + recipe = Recipe(recipe_str, [URL], [output_path]) + + assert len(w) == 1 + assert issubclass(w[-1].category, ConstructorExtraParameterWarning) + + execution = recipe.execution() + execution.run_and_save() + + result_stack = ImageStack.from_path_or_url(output_path) + assert np.allclose( + BASE_EXPECTED * .5, + result_stack.xarray[2, 2, 0, 40:50, 40:50] + )