Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace watchdog with asyncinotify and related changes #17

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/requirements-old.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Ensure changes to these dependencies are reflected in pyproject.toml
asyncinotify==4.0.9
attrs==22.1.0
msgpack==1.0.8
parse==1.19.1
path==16.14.0
rich==13.0.0
watchdog==4.0.0
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{
"python.terminal.activateEnvironment": false
"python.terminal.activateEnvironment": false,
"ltex.additionalRules.motherTongue": "en-US",
"ltex.language": "en-US"
}
6 changes: 6 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Migrate `load_module_file` to stepup-reprep.
- Replace [watchdog](https://github.com/gorakhargosh/watchdog)
by [asyncinotify](https://github.com/ProCern/asyncinotify)
to avoid [a long-standing issue in watchdog](https://github.com/gorakhargosh/watchdog/issues/275).


### Fixed

- Fix bug in the translation of relative paths before they are sent to the director process.
- Add trailing slash to `workdir` argument of `stepup.core.api.step()` if it is missing.
- Fix mistake in worker log filenames.
- Fix bug in back translation of paths when substituted in a step command.


## [1.2.8] - 2024-06-28 {: #v1.2.8 }
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ classifiers = [
]
dependencies = [
# Ensure changes to these dependencies are reflected in .github/requirements-old.txt
"asyncinotify>=4.0.9",
"attrs>=22.1.0",
"msgpack>=1.0.8",
"parse>=1.19.1",
"path>=16.14.0",
"rich>=13.0.0",
"watchdog>=4.0.0",
]
dynamic = ["version"]

Expand Down
10 changes: 5 additions & 5 deletions stepup/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ def step(
tr_workdir = translate(subs(workdir))
amend(env=sorted(amended_env_vars))
command = CaseSensitiveTemplate(command).safe_substitute(
inp=" ".join(myrelpath(inp_path, tr_workdir) for inp_path in tr_inp_paths),
out=" ".join(myrelpath(out_path, tr_workdir) for out_path in tr_out_paths),
vol=" ".join(myrelpath(vol_path, tr_workdir) for vol_path in tr_vol_paths),
inp=" ".join(translate_back(inp_path) for inp_path in tr_inp_paths),
out=" ".join(translate_back(out_path) for out_path in tr_out_paths),
vol=" ".join(translate_back(vol_path) for vol_path in tr_vol_paths),
)

# Look for inputs that match deferred globs and check their existence
Expand Down Expand Up @@ -451,7 +451,7 @@ def copy(src: str, dst: str, optional: bool = False, block: bool = False):
with subs_env_vars() as subs:
src = subs(src)
dst = subs(dst)
path_src = myrelpath(src)
path_src = mynormpath(src)
path_dst = make_path_out(src, dst, None)
amend(env=amended_env_vars)
step("cp -aT ${inp} ${out}", inp=path_src, out=path_dst, optional=optional, block=block)
Expand All @@ -476,7 +476,7 @@ def mkdir(dirname: str, optional: bool = False, block: bool = False):
dirname = subs(dirname)
if not dirname.endswith("/"):
dirname += "/"
dirname = myrelpath(dirname)
dirname = mynormpath(dirname)
amend(env=amended_env_vars)
step(f"mkdir -p {dirname}", out=dirname, optional=optional, block=block)

Expand Down
1 change: 0 additions & 1 deletion stepup/core/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,6 @@ def amend(
async def shutdown(self):
"""Shut down the director and worker processes."""
self._stop_event.set()
self._scheduler.drain()
self._watcher.interrupt.set()

@allow_rpc
Expand Down
192 changes: 108 additions & 84 deletions stepup/core/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import enum

import attrs
from asyncinotify import Inotify, Mask, Watch
from path import Path
from watchdog.events import FileSystemEvent, FileSystemEventHandler, FileSystemMovedEvent
from watchdog.observers import Observer

from .asyncio import stoppable_iterator
from .file import FileState
from .reporter import ReporterClient
from .utils import myabsolute, myrelpath
from .workflow import Workflow

__all__ = ("Watcher",)
Expand Down Expand Up @@ -63,10 +61,6 @@ class Watcher:
# The resume event is set when other parts of StepUp (the runner) want to enable the watcher.
resume: asyncio.Event = attrs.field(factory=asyncio.Event)

# A queue object holding file changes received from the watchdog observer,
# which is running in a separate thread. Each item is in stance of `Change` and path.
change_queue: asyncio.Queue = attrs.field(init=False, factory=asyncio.Queue)

# The following sets contain the deleted and changed file
# while the watcher is observing. These changes are not sent to the workflow yet.
deleted: set[Path] = attrs.field(init=False, factory=set)
Expand All @@ -90,59 +84,26 @@ async def loop(self, stop_event: asyncio.Event):
The iteration ends by informing the workflow of all the changes, after which
StepUp starts the runner again (or exists).
"""
dir_loop = asyncio.create_task(self.dir_loop(stop_event))
while not stop_event.is_set():
await self.resume.wait()
# Check for problems with non-existing directories and raise early if needed
if dir_loop.done() and dir_loop.exception() is not None:
await dir_loop
await self.change_loop()
self.resume.clear()
await dir_loop

async def dir_loop(self, stop_event: asyncio.Event):
"""Add or remove directories to watch, as soon as they are created or defined static.

For every directory added, an event handler for watch_dog is created, which puts
the observed file events to the change_queue.

Parameters
----------
stop_event
Event to interrupt processing items from the dir_queue.
"""
observer = Observer()
observer.start()
watches = {}
try:
async for remove, path in stoppable_iterator(self.dir_queue.get, stop_event):
if remove:
watch = watches.pop(path, None)
if watch is not None:
observer.unschedule(watch)
else:
if not path.is_dir():
raise FileNotFoundError(f"Cannot watch non-existing directory: {path}")
if path not in watches:
handler = QueueEventHandler(self.change_queue, Path(path).isabs())
watches[path] = observer.schedule(handler, path)
finally:
observer.stop()
async with AsyncInotifyWrapper(self.dir_queue) as wrapper:
while not stop_event.is_set():
await self.resume.wait()
await self.watch_changes(wrapper.change_queue)
self.resume.clear()

async def change_loop(self):
async def watch_changes(self, change_queue: asyncio.Queue):
"""Watch file events. They are sent to the workflow right before the runner is restarted."""
self.files_changed.clear()

# Process changes to static files picked up during watch phase.
while not self.change_queue.empty():
change, path = self.change_queue.get_nowait()
while not change_queue.empty():
change, path = change_queue.get_nowait()
if self.workflow.file_states.get(f"file:{path}") == FileState.STATIC:
await self.record_change(change, path)

# Wait for new changes to show up.
self.active.set()
await self.reporter("PHASE", "watch")
async for change, path in stoppable_iterator(self.change_queue.get, self.interrupt):
async for change, path in stoppable_iterator(change_queue.get, self.interrupt):
await self.record_change(change, path)

# Feed all updates to the worker and clean up.
Expand Down Expand Up @@ -176,41 +137,104 @@ async def record_change(self, change: Change, path: Path):
await self.record_change(Change.UPDATED, sub_path)


class QueueEventHandler(FileSystemEventHandler):
"""A file system event handler for watchdog that puts events safely on an asyncio queue.
@attrs.define
class AsyncInotifyWrapper:
"""Interface between a `Watcher` instance and the `asyncinotify` library."""

dir_queue: asyncio.Queue = attrs.field()
"""The dir_queue provides directories to (un)watch.

Parameters
----------
queue
The queue on which file events are put.
is_absolute
True when the directory being watch is known to StepUp as an absolute path.
Each item is a tuple `(remove, path)`, where `remove` is True when the path
not longer needs to be watched.
"""

def __init__(self, queue: asyncio.Queue, is_absolute: bool):
self._queue = queue
self._is_absolute = is_absolute
self._loop = asyncio.get_event_loop()

def on_any_event(self, event: FileSystemEvent):
"""Process any event received from the watchdog observer."""
if isinstance(event, FileSystemMovedEvent):
self.put_event(Change.DELETED, event.src_path, event.is_directory)
self.put_event(Change.UPDATED, event.dest_path, event.is_directory)
elif event.event_type == "created":
self.put_event(Change.UPDATED, event.src_path, event.is_directory)
elif event.event_type in ["modified", "closed"]:
if not event.is_directory:
self.put_event(Change.UPDATED, event.src_path, event.is_directory)
elif event.event_type == "deleted":
self.put_event(Change.DELETED, event.src_path, event.is_directory)
elif event.event_type != "opened":
raise NotImplementedError(f"Cannot handle event: {event}")

def put_event(self, change: Change, path: str, is_directory: bool):
"""Put an event on the queue, includes translation of abs/rel path."""
path = myabsolute(path) if self._is_absolute else myrelpath(path)
if is_directory:
path = path / ""
# The event calls from watch dog live in a separate thread ...
self._loop.call_soon_threadsafe(self._queue.put_nowait, (change, path))
inotify: Inotify | None = attrs.field(init=False, default=None)
"""Inotify object, only present in context."""

stop_event: asyncio.Event = attrs.field(init=False, factory=asyncio.Event)
"""Internal stop event, called when context is closed."""

watches: dict[str, Watch] = attrs.field(init=False, factory=dict)
"""Directory of watches created with asyncinotify"""

change_queue: asyncio.Queue = attrs.field(init=False, factory=asyncio.Queue)
"""A queue object holding file changes received from asyncinotify.

Each item is a tuple with a `Change` instance and a path."""

dir_loop_task: asyncio.Task | None = attrs.field(init=False, default=None)
"""Task corresponding to the dir_loop method."""

change_loop_task: asyncio.Task | None = attrs.field(init=False, default=None)
"""Task corresponding to the change_loop method."""

async def __aenter__(self):
"""Start using the Inotify Wrapper."""
self.inotify = Inotify()
self.stop_event.clear()
self.dir_loop_task = asyncio.create_task(self.dir_loop())
self.change_loop_task = asyncio.create_task(self.change_loop())
return self

async def __aexit__(self, exc_type, exc, tb):
"""Close the InotifyWrapper."""
self.stop_event.set()
await asyncio.gather(self.dir_loop_task, self.change_loop_task)
self.dir_loop_task = None
self.change_loop_task = None
self.inotify.close()
self.inotify = None

async def dir_loop(self):
"""Add or remove directories to watch, as soon as they are created or defined static.

For every directory added, an event handler for watch_dog is created, which puts
the observed file events to the change_queue.

Parameters
----------
stop_event
Event to interrupt processing items from the dir_queue.
"""
async for remove, path in stoppable_iterator(self.dir_queue.get, self.stop_event):
if remove:
watch = self.watches.pop(path, None)
if watch is not None:
self.inotify.rm_watch(watch)
else:
if not path.is_dir():
raise FileNotFoundError(f"Cannot watch non-existing directory: {path}")
if path not in self.watches:
self.watches[path] = self.inotify.add_watch(
path,
(
Mask.MODIFY
| Mask.CREATE
| Mask.DELETE
| Mask.CLOSE_WRITE
| Mask.MOVE
| Mask.MOVE_SELF
| Mask.DELETE_SELF
| Mask.UNMOUNT
| Mask.ATTRIB
| Mask.IGNORED
),
)

async def change_loop(self):
"""Collect from INotify and translate then to items for the change_queue."""
async for event in stoppable_iterator(self.inotify.get, self.stop_event):
# Drop invalid watches (deleted or unmounted directories)
if event.mask & Mask.IGNORED:
self.watches.pop(f"{event.watch.path}/", None)
if event.mask & Mask.MOVE_SELF:
raise RuntimeError("StepUp does not support moving directories it is watching.")
change = (
Change.DELETED
if event.mask & (Mask.DELETE | Mask.DELETE_SELF | Mask.MOVED_FROM)
else Change.UPDATED
)
path = Path(event.path)
if event.mask & Mask.ISDIR:
path = path / ""
self.change_queue.put_nowait((change, path))
2 changes: 1 addition & 1 deletion stepup/core/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def parse_args():
async def async_main():
args = parse_args()
with contextlib.ExitStack() as stack:
ferr = stack.enter_context(open(".stepup/logs/worker{args.worker_idx}", "w"))
ferr = stack.enter_context(open(f".stepup/logs/worker{args.worker_idx}", "w"))
stack.enter_context(contextlib.redirect_stderr(ferr))
print(f"PID {os.getpid()}", file=sys.stderr)
async with ReporterClient.socket(args.reporter_socket) as reporter:
Expand Down
2 changes: 2 additions & 0 deletions tests/cases/absolute/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.stepup
current_*
1 change: 1 addition & 0 deletions tests/cases/absolute/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is a simple example with an absolute path.
Loading