Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SoftFileLock asyncio workaround (#444) #460

Merged
merged 8 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .zenodo.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
"name": "Johnson, Charles E.",
"orcid": "0000-0001-7814-3501"
},
{
"affiliation": "FCBG, EPFL",
"name": "Wigger, Jeffrey",
"orcid": "0000-0003-0978-4326"
},
{
"affiliation": "MIT, HMS",
"name": "Ghosh, Satrajit",
Expand Down
3 changes: 2 additions & 1 deletion pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
ensure_list,
record_error,
hash_function,
PydraFileLock,
)
from .helpers_file import copyfile_input, template_update
from .graph import DiGraph
Expand Down Expand Up @@ -1007,7 +1008,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
self.create_connections(task)
lockfile = self.cache_dir / (checksum + ".lock")
self.hooks.pre_run(self)
with SoftFileLock(lockfile):
async with PydraFileLock(lockfile):
# retrieve cached results
if not (rerun or self.task_rerun):
result = self.result()
Expand Down
28 changes: 27 additions & 1 deletion pydra/engine/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import attr
import cloudpickle as cp
from pathlib import Path
from filelock import SoftFileLock
from filelock import SoftFileLock, Timeout
import os
import sys
from hashlib import sha256
Expand Down Expand Up @@ -893,3 +893,29 @@ def argstr_formatting(argstr, inputs, value_updates=None):
.strip()
)
return argstr_formatted


class PydraFileLock:
"""Wrapper for filelock's SoftFileLock that makes it work with asyncio."""

def __init__(self, lockfile):
self.lockfile = lockfile
self.timeout = 0.1

async def __aenter__(self):
lock = SoftFileLock(self.lockfile)
acquired_lock = False
while not acquired_lock:
try:
lock.acquire(timeout=0)
acquired_lock = True
except Timeout:
await asyncio.sleep(self.timeout)
if self.timeout <= 2:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we raise some meaningful exception here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is ok. it's just used to trap. this is within a polling loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

self.timeout = self.timeout * 2
self.lock = lock
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self.lock.release()
return None
77 changes: 77 additions & 0 deletions pydra/engine/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from ..submitter import Submitter
from ..core import Workflow
from ... import mark


def test_wf_name_conflict1():
Expand Down Expand Up @@ -4561,3 +4562,79 @@ def test_graph_5(tmpdir):
if DOT_FLAG:
name = f"graph_{sys._getframe().f_code.co_name}"
exporting_graphs(wf=wf, name=name)


@pytest.mark.timeout(20)
def test_duplicate_input_on_split_wf(tmpdir):
""" checking if the workflow gets stuck if it has to run two tasks with equal checksum;
This can occur when splitting on a list containing duplicate values.
"""
text = ["test"] * 2

@mark.task
def printer(a):
return a

wf = Workflow(name="wf", input_spec=["text"], cache_dir=tmpdir)
wf.split(("text"), text=text)

wf.add(printer(name="printer1", a=wf.lzin.text))

wf.set_output([("out1", wf.printer1.lzout.out)])

with Submitter(plugin="cf", n_procs=6) as sub:
sub(wf)

res = wf.result()

assert res[0].output.out1 == "test" and res[1].output.out1 == "test"


@pytest.mark.timeout(40)
def test_inner_outer_wf_duplicate(tmpdir):
""" checking if the execution gets stuck if there is an inner and outer workflows
thar run two nodes with the exact same inputs.
"""
task_list = ["First", "Second"]
start_list = [3]

@mark.task
def one_arg(start_number):
for k in range(10):
start_number += 1
return start_number

@mark.task
def one_arg_inner(start_number):
for k in range(10):
start_number += 1
return start_number

# Outer workflow
test_outer = Workflow(
name="test_outer", input_spec=["start_number", "task_name"], cache_dir=tmpdir
)
# Splitting on both arguments
test_outer.split(
["start_number", "task_name"], start_number=start_list, task_name=task_list
)

# Inner Workflow
test_inner = Workflow(name="test_inner", input_spec=["start_number1"])
test_inner.add(
one_arg_inner(name="Ilevel1", start_number=test_inner.lzin.start_number1)
)
test_inner.set_output([("res", test_inner.Ilevel1.lzout.out)])

# Outer workflow has two nodes plus the inner workflow
test_outer.add(one_arg(name="level1", start_number=test_outer.lzin.start_number))
test_outer.add(test_inner)
test_inner.inputs.start_number1 = test_outer.level1.lzout.out

test_outer.set_output([("res2", test_outer.test_inner.lzout.res)])

with Submitter(plugin="cf") as sub:
sub(test_outer)

res = test_outer.result()
assert res[0].output.res2 == 23 and res[1].output.res2 == 23
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ test_requires =
pytest-env
pytest-xdist < 2.0
pytest-rerunfailures
pytest-timeout
codecov
numpy
psutil
Expand Down Expand Up @@ -68,6 +69,7 @@ test =
pytest-env
pytest-xdist < 2.0
pytest-rerunfailures
pytest-timeout
codecov
numpy
pyld
Expand Down