diff --git a/pydra/engine/helpers.py b/pydra/engine/helpers.py index 14d7ec01f1..fc7867b8dd 100644 --- a/pydra/engine/helpers.py +++ b/pydra/engine/helpers.py @@ -242,3 +242,22 @@ def create_pyscript(script_path, checksum): def hash_function(obj): return sha256(str(obj).encode()).hexdigest() + + +def hash_file(afile, chunk_len=8192, crypto=sha256, raise_notfound=False): + """ + Computes hash of a file using 'crypto' module + """ + if not os.path.isfile(afile): + if raise_notfound: + raise RuntimeError('File "%s" not found.' % afile) + return None + + crypto_obj = crypto() + with open(afile, "rb") as fp: + while True: + data = fp.read(chunk_len) + if not data: + break + crypto_obj.update(data) + return crypto_obj.hexdigest() diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 0f51564090..52c4a81157 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -2,8 +2,13 @@ from pathlib import Path import typing as ty -File = ty.NewType("File", Path) -Directory = ty.NewType("Directory", Path) + +class File(Path): + pass + + +class Directory(Path): + pass @dc.dataclass @@ -20,10 +25,12 @@ class BaseSpec: @property def hash(self): """Compute a basic hash for any given set of fields""" - from .helpers import hash_function + from .helpers import hash_function, hash_file inp_dict = { - field.name: getattr(self, field.name) + field.name: hash_file(getattr(self, field.name)) + if field.type == File + else getattr(self, field.name) for field in dc.fields(self) if field.name not in ["_graph_checksums"] } diff --git a/pydra/engine/tests/test_helpers.py b/pydra/engine/tests/test_helpers.py index 0d0b829c82..a409ae51bf 100644 --- a/pydra/engine/tests/test_helpers.py +++ b/pydra/engine/tests/test_helpers.py @@ -39,3 +39,13 @@ def test_create_pyscript(tmpdir): helpers.save(outdir, task=foo) pyscript = helpers.create_pyscript(outdir, foo.checksum) assert pyscript.exists() + + +def test_hash_file(tmpdir): + outdir = Path(tmpdir) + with open(outdir / "test.file", "wt") as fp: + fp.write("test") + assert ( + helpers.hash_file(outdir / "test.file") + == "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" + ) diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index f9d96ea4fc..fd03d3fbba 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -1,5 +1,10 @@ +from pathlib import Path +import typing as ty + from ..specs import ( BaseSpec, + SpecInfo, + File, RuntimeSpec, Runtime, Result, @@ -9,6 +14,7 @@ SingularitySpec, LazyField, ) +from ..helpers import make_klass import pytest @@ -142,3 +148,24 @@ def test_lazy_getvale(): with pytest.raises(Exception) as excinfo: lf.inp_c assert str(excinfo.value) == "Task tn has no input attribute inp_c" + + +def test_file_hash(tmpdir): + tmpdir.chdir() + outfile = "test.file" + fields = [("in_file", ty.Any)] + input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) + inputs = make_klass(input_spec) + assert ( + inputs(str(outfile)).hash + == "1384a1eb11cd94a5b826a82b948313b9237a0956d406ccff59e79ec92b3c935f" + ) + with open(outfile, "wt") as fp: + fp.write("test") + fields = [("in_file", File)] + input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,)) + inputs = make_klass(input_spec) + assert ( + inputs(outfile).hash + == "088625131e6718a00170ad445a9c295244dffd4e5d847c8ee4b1606d623dacb1" + ) diff --git a/pydra/engine/tests/test_tasks_files.py b/pydra/engine/tests/test_tasks_files.py new file mode 100644 index 0000000000..6b5fd14fc6 --- /dev/null +++ b/pydra/engine/tests/test_tasks_files.py @@ -0,0 +1,113 @@ +import os +from pathlib import Path +import numpy as np +import pytest +import typing as ty + +from ..submitter import Submitter +from ..core import Workflow +from ... import mark +from ..specs import File + + +@mark.task +def file_add2(file): + array_inp = np.load(file) + array_out = array_inp + 2 + cwd = os.getcwd() + # providing a full path + file_out = os.path.join(cwd, "arr_out.npy") + np.save(file_out, array_out) + return file_out + + +@mark.task +def file_mult(file): + array_inp = np.load(file) + array_out = 10 * array_inp + cwd = os.getcwd() + file_out = os.path.join(cwd, "arr_out.npy") + np.save(file_out, array_out) + return file_out + + +@mark.task +def file_add2_annot(file: File) -> ty.NamedTuple("Output", [("out", File)]): + array_inp = np.load(file) + array_out = array_inp + 2 + cwd = os.getcwd() + # providing a full path + file_out = os.path.join(cwd, "arr_out.npy") + np.save(file_out, array_out) + return file_out + + +@mark.task +def file_mult_annot(file: File) -> ty.NamedTuple("Output", [("out", File)]): + array_inp = np.load(file) + array_out = 10 * array_inp + cwd = os.getcwd() + file_out = os.path.join(cwd, "arr_out.npy") + np.save(file_out, array_out) + return file_out + + +def test_task_1(tmpdir): + """ task that takes file as an input""" + os.chdir(tmpdir) + arr = np.array([2]) + # creating abs path + file = os.path.join(os.getcwd(), "arr1.npy") + np.save(file, arr) + nn = file_add2(name="add2", file=file) + + with Submitter(plugin="cf") as sub: + sub(nn) + + # checking the results + results = nn.result() + res = np.load(results.output.out) + assert res == np.array([4]) + + +def test_wf_1(tmpdir): + """ workflow with 2 tasks that take file as an input and give file as an aoutput""" + wf = Workflow(name="wf_1", input_spec=["file_orig"]) + wf.add(file_add2(name="add2", file=wf.lzin.file_orig)) + wf.add(file_mult(name="mult", file=wf.add2.lzout.out)) + wf.set_output([("out", wf.mult.lzout.out)]) + + os.chdir(tmpdir) + arr = np.array([2, 3]) + # creating abs path + file_orig = os.path.join(os.getcwd(), "arr_orig.npy") + np.save(file_orig, arr) + wf.inputs.file_orig = file_orig + + with Submitter(plugin="cf") as sub: + sub(wf) + + assert wf.output_dir.exists() + file_output = wf.result().output.out + assert Path(file_output).exists() + # loading results + array_out = np.load(file_output) + assert np.array_equal(array_out, [40, 50]) + + +def test_file_annotation_1(tmpdir): + """ task that takes file as an input""" + os.chdir(tmpdir) + arr = np.array([2]) + # creating abs path + file = os.path.join(os.getcwd(), "arr1.npy") + np.save(file, arr) + nn = file_add2_annot(name="add2", file=file) + + with Submitter(plugin="cf") as sub: + sub(nn) + + # checking the results + results = nn.result() + res = np.load(results.output.out) + assert res == np.array([4])