Skip to content

Commit

Permalink
Merge pull request #486 from materialsproject/run-locally-root-dir
Browse files Browse the repository at this point in the history
`run_locally()` add `root_dir: str | Path | None` keyword
  • Loading branch information
janosh authored Nov 16, 2023
2 parents 61bbaa4 + a8d4587 commit 08fbf5f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
6 changes: 5 additions & 1 deletion src/jobflow/core/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ def resolve(

for attr_type, attr in self.attributes:
# i means index else use attribute access
data = data[attr] if attr_type == "i" else getattr(data, attr)
data = (
data[attr]
if attr_type == "i" or isinstance(data, dict)
else getattr(data, attr)
)

return data

Expand Down
27 changes: 17 additions & 10 deletions src/jobflow/managers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import typing

if typing.TYPE_CHECKING:
import jobflow
from pathlib import Path

import jobflow

logger = logging.getLogger(__name__)

Expand All @@ -17,6 +18,7 @@ def run_locally(
log: bool = True,
store: jobflow.JobStore = None,
create_folders: bool = False,
root_dir: str | Path | None = None,
ensure_success: bool = False,
allow_external_references: bool = False,
) -> dict[str, dict[int, jobflow.Response]]:
Expand All @@ -25,25 +27,29 @@ def run_locally(
Parameters
----------
flow
flow : Flow | Job | list[Job]
A job or flow.
log
log : bool
Whether to print log messages.
store
store : JobStore
A job store. If a job store is not specified then
:obj:`JobflowSettings.JOB_STORE` will be used. By default this is a maggma
``MemoryStore`` but can be customised by setting the jobflow configuration file.
create_folders
create_folders : bool
Whether to run each job in a new folder.
ensure_success
root_dir : str | Path | None
The root directory to run the jobs in or where to create new subfolders if
``create_folders`` is True. If None then the current working
directory will be used.
ensure_success : bool
Raise an error if the flow was not executed successfully.
allow_external_references
allow_external_references : bool
If False all the references to other outputs should be from other Jobs
of the Flow.
Returns
-------
Dict[str, Dict[int, Response]]
dict[str, dict[int, Response]]
The responses of the jobs, as a dict of ``{uuid: {index: response}}``.
"""
from collections import defaultdict
Expand All @@ -60,6 +66,9 @@ def run_locally(
if store is None:
store = SETTINGS.JOB_STORE

root_dir = Path.cwd() if root_dir is None else Path(root_dir).resolve()
root_dir.mkdir(exist_ok=True)

store.connect()

if log:
Expand All @@ -72,8 +81,6 @@ def run_locally(
responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict)
stop_jobflow = False

root_dir = Path.cwd()

def _run_job(job: jobflow.Job, parents):
nonlocal stop_jobflow

Expand Down
9 changes: 9 additions & 0 deletions tests/managers/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ def test_simple_flow(memory_jobstore, clean_dir, simple_flow, capsys):
folders = list(Path(".").glob("job_*/"))
assert len(folders) == 1

# run with folders and root_dir
assert Path(root_dir := "test").exists() is False
responses = run_locally(
flow, store=memory_jobstore, create_folders=True, root_dir=root_dir
)
assert responses[uuid][1].output == "12345_end"
folders = list(Path(root_dir).glob("job_*/"))
assert len(folders) == 1


def test_connected_flow(memory_jobstore, clean_dir, connected_flow):
from jobflow import run_locally
Expand Down

0 comments on commit 08fbf5f

Please sign in to comment.