Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly clean up threads when stopping Inotify. Improve Eventlet tests. #1070

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions src/watchdog/observers/inotify_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ctypes.util
import errno
import os
import select
import struct
import threading
from ctypes import c_char_p, c_int, c_uint32
Expand Down Expand Up @@ -148,6 +149,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No
Inotify._raise_error()
self._inotify_fd = inotify_fd
self._lock = threading.Lock()
self._closed = False
self._waiting_to_read = True
self._kill_r, self._kill_w = os.pipe()

# Stores the watch descriptor for a given path.
self._wd_for_path: dict[bytes, int] = {}
Expand Down Expand Up @@ -230,13 +234,19 @@ def remove_watch(self, path: bytes) -> None:
def close(self) -> None:
"""Closes the inotify instance and removes all associated watches."""
with self._lock:
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)
if not self._closed:
self._closed = True

# descriptor may be invalid because file was deleted
with contextlib.suppress(OSError):
os.close(self._inotify_fd)
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)

if self._waiting_to_read:
# inotify_rm_watch() should write data to _inotify_fd and wake
# the thread, but writing to the kill channel will gaurentee this
os.write(self._kill_w, b'!')
else:
self._close_resources()

def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]:
"""Reads events from inotify and yields them."""
Expand Down Expand Up @@ -276,6 +286,21 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:
event_buffer = None
while True:
try:
with self._lock:
if self._closed:
return []

self._waiting_to_read = True

select.select([self._inotify_fd, self._kill_r], [], [])

with self._lock:
self._waiting_to_read = False

if self._closed:
self._close_resources()
return []

event_buffer = os.read(self._inotify_fd, event_buffer_size)
except OSError as e:
if e.errno == errno.EINTR:
Expand Down Expand Up @@ -340,6 +365,11 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:

return event_list

def _close_resources(self):
os.close(self._inotify_fd)
os.close(self._kill_r)
os.close(self._kill_w)

# Non-synchronized methods.
def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None:
"""Adds a watch (optionally recursively) for the given directory path
Expand Down
13 changes: 6 additions & 7 deletions src/watchdog/utils/bricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,13 @@ def _init(self, maxsize: int) -> None:
super()._init(maxsize)
self._last_item = None

def _put(self, item: Any) -> None:
def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
if self._last_item is None or item != self._last_item:
super()._put(item)
self._last_item = item
else:
# `put` increments `unfinished_tasks` even if we did not put
# anything into the queue here
self.unfinished_tasks -= 1
super().put(item, block, timeout)

def _put(self, item: Any) -> None:
super()._put(item)
self._last_item = item

def _get(self) -> Any:
item = super()._get()
Expand Down
30 changes: 30 additions & 0 deletions tests/isolated/eventlet_observer_stops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
if __name__ == '__main__':
import eventlet

eventlet.monkey_patch()

import signal
import sys
import tempfile

from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler

with tempfile.TemporaryDirectory() as temp_dir:
def run_observer():
event_handler = LoggingEventHandler()
observer = Observer()
observer.schedule(event_handler, temp_dir)
observer.start()
eventlet.sleep(1)
observer.stop()

def on_alarm(signum, frame):
print("Observer.stop() never finished!", file=sys.stderr)
sys.exit(1)

signal.signal(signal.SIGALRM, on_alarm)
signal.alarm(4)

thread = eventlet.spawn(run_observer)
thread.wait()
33 changes: 33 additions & 0 deletions tests/isolated/eventlet_skip_repeat_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
if __name__ == '__main__':
import eventlet

eventlet.monkey_patch()

from watchdog.utils.bricks import SkipRepeatsQueue

q = SkipRepeatsQueue(10)
q.put('A')
q.put('A')
q.put('A')
q.put('A')
q.put('B')
q.put('A')

value = q.get()
assert value == 'A'
q.task_done()

assert q.unfinished_tasks == 2

value = q.get()
assert value == 'B'
q.task_done()

assert q.unfinished_tasks == 1

value = q.get()
assert value == 'A'
q.task_done()

assert q.empty()
assert q.unfinished_tasks == 0
7 changes: 0 additions & 7 deletions tests/markers.py

This file was deleted.

11 changes: 10 additions & 1 deletion tests/test_inotify_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import errno
import logging
import os
import select
import struct
from typing import TYPE_CHECKING
from unittest.mock import patch
Expand Down Expand Up @@ -56,6 +57,13 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue,
+ struct_inotify(wd=3, mask=const.IN_IGNORED)
)

select_bkp = select.select

def fakeselect(read_list, *args, **kwargs):
if inotify_fd in read_list:
return [inotify_fd], [], []
return select_bkp(read_list, *args, **kwargs)

os_read_bkp = os.read

def fakeread(fd, length):
Expand Down Expand Up @@ -92,8 +100,9 @@ def inotify_rm_watch(fd, wd):
mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init)
mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch)
mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch)
mock6 = patch.object(select, "select", new=fakeselect)

with mock1, mock2, mock3, mock4, mock5:
with mock1, mock2, mock3, mock4, mock5, mock6:
start_watching(path=p(""))
# Watchdog Events
for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2:
Expand Down
24 changes: 24 additions & 0 deletions tests/test_isolated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
import importlib

from watchdog.utils import platform

from .utils import run_isolated_test


# Kqueue isn't supported by Eventlet, so BSD is out
# Current usage ReadDirectoryChangesW on Windows is blocking, though async may be possible
@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux")
def test_observer_stops_in_eventlet():
if not importlib.util.find_spec('eventlet'):
pytest.skip("eventlet not installed")

run_isolated_test('eventlet_observer_stops.py')


@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux")
def test_eventlet_skip_repeat_queue():
if not importlib.util.find_spec('eventlet'):
pytest.skip("eventlet not installed")

run_isolated_test('eventlet_skip_repeat_queue.py')
17 changes: 1 addition & 16 deletions tests/test_skip_repeats_queue.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

import pytest

from watchdog import events
from watchdog.utils.bricks import SkipRepeatsQueue

from .markers import cpython_only


def basic_actions():
def test_basic_queue():
q = SkipRepeatsQueue()

e1 = (2, "fred")
Expand All @@ -25,10 +21,6 @@ def basic_actions():
assert q.empty()


def test_basic_queue():
basic_actions()


def test_allow_nonconsecutive():
q = SkipRepeatsQueue()

Expand Down Expand Up @@ -86,10 +78,3 @@ def test_consecutives_allowed_across_empties():
q.put(e1) # this repeat is allowed because 'last' added is now gone from queue
assert e1 == q.get()
assert q.empty()


@cpython_only
def test_eventlet_monkey_patching():
eventlet = pytest.importorskip("eventlet")
eventlet.monkey_patch()
basic_actions()
28 changes: 28 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import dataclasses
import os
import subprocess
import sys
from queue import Queue
from typing import Protocol

Expand Down Expand Up @@ -97,3 +99,29 @@ def close(self) -> None:
alive = [emitter.is_alive() for emitter in self.emitters]
self.emitters = []
assert alive == [False] * len(alive)


def run_isolated_test(path):
ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated')
path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path))

src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src')
new_env = os.environ.copy()
new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])

new_argv = [sys.executable, path]

p = subprocess.Popen(
new_argv,
env=new_env,
)

# in case test goes haywire, don't let it run forever
timeout = 10
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
assert False, 'timed out'

assert p.returncode == 0
Loading