Skip to content

Commit

Permalink
Merge pull request #116 from satra/enh/file
Browse files Browse the repository at this point in the history
Enh/file - allow support for hashing Files - closes #96
  • Loading branch information
satra authored Aug 5, 2019
2 parents e8d5c0a + 02100b6 commit 397d137
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 4 deletions.
19 changes: 19 additions & 0 deletions pydra/engine/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,22 @@ def create_pyscript(script_path, checksum):

def hash_function(obj):
return sha256(str(obj).encode()).hexdigest()


def hash_file(afile, chunk_len=8192, crypto=sha256, raise_notfound=False):
"""
Computes hash of a file using 'crypto' module
"""
if not os.path.isfile(afile):
if raise_notfound:
raise RuntimeError('File "%s" not found.' % afile)
return None

crypto_obj = crypto()
with open(afile, "rb") as fp:
while True:
data = fp.read(chunk_len)
if not data:
break
crypto_obj.update(data)
return crypto_obj.hexdigest()
15 changes: 11 additions & 4 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
from pathlib import Path
import typing as ty

File = ty.NewType("File", Path)
Directory = ty.NewType("Directory", Path)

class File(Path):
pass


class Directory(Path):
pass


@dc.dataclass
Expand All @@ -20,10 +25,12 @@ class BaseSpec:
@property
def hash(self):
"""Compute a basic hash for any given set of fields"""
from .helpers import hash_function
from .helpers import hash_function, hash_file

inp_dict = {
field.name: getattr(self, field.name)
field.name: hash_file(getattr(self, field.name))
if field.type == File
else getattr(self, field.name)
for field in dc.fields(self)
if field.name not in ["_graph_checksums"]
}
Expand Down
10 changes: 10 additions & 0 deletions pydra/engine/tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ def test_create_pyscript(tmpdir):
helpers.save(outdir, task=foo)
pyscript = helpers.create_pyscript(outdir, foo.checksum)
assert pyscript.exists()


def test_hash_file(tmpdir):
outdir = Path(tmpdir)
with open(outdir / "test.file", "wt") as fp:
fp.write("test")
assert (
helpers.hash_file(outdir / "test.file")
== "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
)
27 changes: 27 additions & 0 deletions pydra/engine/tests/test_specs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from pathlib import Path
import typing as ty

from ..specs import (
BaseSpec,
SpecInfo,
File,
RuntimeSpec,
Runtime,
Result,
Expand All @@ -9,6 +14,7 @@
SingularitySpec,
LazyField,
)
from ..helpers import make_klass
import pytest


Expand Down Expand Up @@ -142,3 +148,24 @@ def test_lazy_getvale():
with pytest.raises(Exception) as excinfo:
lf.inp_c
assert str(excinfo.value) == "Task tn has no input attribute inp_c"


def test_file_hash(tmpdir):
tmpdir.chdir()
outfile = "test.file"
fields = [("in_file", ty.Any)]
input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,))
inputs = make_klass(input_spec)
assert (
inputs(str(outfile)).hash
== "1384a1eb11cd94a5b826a82b948313b9237a0956d406ccff59e79ec92b3c935f"
)
with open(outfile, "wt") as fp:
fp.write("test")
fields = [("in_file", File)]
input_spec = SpecInfo(name="Inputs", fields=fields, bases=(BaseSpec,))
inputs = make_klass(input_spec)
assert (
inputs(outfile).hash
== "088625131e6718a00170ad445a9c295244dffd4e5d847c8ee4b1606d623dacb1"
)
113 changes: 113 additions & 0 deletions pydra/engine/tests/test_tasks_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
from pathlib import Path
import numpy as np
import pytest
import typing as ty

from ..submitter import Submitter
from ..core import Workflow
from ... import mark
from ..specs import File


@mark.task
def file_add2(file):
array_inp = np.load(file)
array_out = array_inp + 2
cwd = os.getcwd()
# providing a full path
file_out = os.path.join(cwd, "arr_out.npy")
np.save(file_out, array_out)
return file_out


@mark.task
def file_mult(file):
array_inp = np.load(file)
array_out = 10 * array_inp
cwd = os.getcwd()
file_out = os.path.join(cwd, "arr_out.npy")
np.save(file_out, array_out)
return file_out


@mark.task
def file_add2_annot(file: File) -> ty.NamedTuple("Output", [("out", File)]):
array_inp = np.load(file)
array_out = array_inp + 2
cwd = os.getcwd()
# providing a full path
file_out = os.path.join(cwd, "arr_out.npy")
np.save(file_out, array_out)
return file_out


@mark.task
def file_mult_annot(file: File) -> ty.NamedTuple("Output", [("out", File)]):
array_inp = np.load(file)
array_out = 10 * array_inp
cwd = os.getcwd()
file_out = os.path.join(cwd, "arr_out.npy")
np.save(file_out, array_out)
return file_out


def test_task_1(tmpdir):
""" task that takes file as an input"""
os.chdir(tmpdir)
arr = np.array([2])
# creating abs path
file = os.path.join(os.getcwd(), "arr1.npy")
np.save(file, arr)
nn = file_add2(name="add2", file=file)

with Submitter(plugin="cf") as sub:
sub(nn)

# checking the results
results = nn.result()
res = np.load(results.output.out)
assert res == np.array([4])


def test_wf_1(tmpdir):
""" workflow with 2 tasks that take file as an input and give file as an aoutput"""
wf = Workflow(name="wf_1", input_spec=["file_orig"])
wf.add(file_add2(name="add2", file=wf.lzin.file_orig))
wf.add(file_mult(name="mult", file=wf.add2.lzout.out))
wf.set_output([("out", wf.mult.lzout.out)])

os.chdir(tmpdir)
arr = np.array([2, 3])
# creating abs path
file_orig = os.path.join(os.getcwd(), "arr_orig.npy")
np.save(file_orig, arr)
wf.inputs.file_orig = file_orig

with Submitter(plugin="cf") as sub:
sub(wf)

assert wf.output_dir.exists()
file_output = wf.result().output.out
assert Path(file_output).exists()
# loading results
array_out = np.load(file_output)
assert np.array_equal(array_out, [40, 50])


def test_file_annotation_1(tmpdir):
""" task that takes file as an input"""
os.chdir(tmpdir)
arr = np.array([2])
# creating abs path
file = os.path.join(os.getcwd(), "arr1.npy")
np.save(file, arr)
nn = file_add2_annot(name="add2", file=file)

with Submitter(plugin="cf") as sub:
sub(nn)

# checking the results
results = nn.result()
res = np.load(results.output.out)
assert res == np.array([4])

0 comments on commit 397d137

Please sign in to comment.