Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated unbounded queue #2925

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions check.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/bin/bash

set -ex
set -o pipefail

ON_GITHUB_CI=true
EXIT_STATUS=0
Expand Down Expand Up @@ -55,16 +56,15 @@ MYPY=0
echo "::group::Mypy"
# Cleanup previous runs.
rm -f mypy_annotate.dat
# Pipefail makes these pipelines fail if mypy does, even if mypy_annotate.py succeeds.
set -o pipefail

mypy --show-error-end --platform linux | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Linux \
|| { echo "* Mypy (Linux) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; }
# Darwin tests FreeBSD too
mypy --show-error-end --platform darwin | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Mac \
|| { echo "* Mypy (Mac) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; }
mypy --show-error-end --platform win32 | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Windows \
|| { echo "* Mypy (Windows) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; }
set +o pipefail

# Re-display errors using Github's syntax, read out of mypy_annotate.dat
python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat
# Then discard.
Expand Down
1 change: 0 additions & 1 deletion src/trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
temporarily_detach_coroutine_object,
wait_task_rescheduled,
)
from ._unbounded_queue import UnboundedQueue, UnboundedQueueStatistics

# Windows imports
if sys.platform == "win32":
Expand Down
4 changes: 2 additions & 2 deletions src/trio/_core/_generated_io_kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from collections.abc import Callable
from contextlib import AbstractContextManager

from .. import _core
from .._channel import MemoryReceiveChannel
from .._file_io import _HasFileNo
from ._traps import Abort, RaiseCancelT

Expand Down Expand Up @@ -46,7 +46,7 @@ def current_kqueue() -> select.kqueue:
@enable_ki_protection
def monitor_kevent(
ident: int, filter: int
) -> AbstractContextManager[_core.UnboundedQueue[select.kevent]]:
) -> AbstractContextManager[MemoryReceiveChannel[select.kevent]]:
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__.
Expand Down
4 changes: 2 additions & 2 deletions src/trio/_core/_generated_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

from typing_extensions import Buffer

from .._channel import MemoryReceiveChannel
from .._file_io import _HasFileNo
from ._unbounded_queue import UnboundedQueue
from ._windows_cffi import CData, Handle

assert not TYPE_CHECKING or sys.platform == "win32"
Expand Down Expand Up @@ -191,7 +191,7 @@ def current_iocp() -> int:

@enable_ki_protection
def monitor_completion_key() -> (
AbstractContextManager[tuple[int, UnboundedQueue[object]]]
AbstractContextManager[tuple[int, MemoryReceiveChannel[object]]]
):
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
Expand Down
42 changes: 21 additions & 21 deletions src/trio/_core/_io_kqueue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import errno
import math
import select
import sys
from contextlib import contextmanager
Expand All @@ -18,7 +19,8 @@

from typing_extensions import TypeAlias

from .._core import Abort, RaiseCancelT, Task, UnboundedQueue
from .._channel import MemoryReceiveChannel, MemorySendChannel
from .._core import Abort, RaiseCancelT, Task
from .._file_io import _HasFileNo

assert not TYPE_CHECKING or (sys.platform != "linux" and sys.platform != "win32")
Expand All @@ -36,8 +38,7 @@ class _KqueueStatistics:
@attrs.define(eq=False)
class KqueueIOManager:
_kqueue: select.kqueue = attrs.Factory(select.kqueue)
# {(ident, filter): Task or UnboundedQueue}
_registered: dict[tuple[int, int], Task | UnboundedQueue[select.kevent]] = (
_registered: dict[tuple[int, int], Task | MemorySendChannel[select.kevent]] = (
attrs.Factory(dict)
)
_force_wakeup: WakeupSocketpair = attrs.Factory(WakeupSocketpair)
Expand Down Expand Up @@ -98,7 +99,7 @@ def process_events(self, events: EventResult) -> None:
if isinstance(receiver, _core.Task):
_core.reschedule(receiver, outcome.Value(event))
else:
receiver.put_nowait(event) # TODO: test this line
receiver.send_nowait(event) # TODO: test this line
Copy link
Member

@graingert graingert Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's possible for this to fail if someone calls:

with trio.lowlevel.monitor_kevent() as q:
    q.close()

we should either catch the exception here or yield a clone

I'd add it to this PR for you but I just cannot work out how to cover this line - it should be easy but I'm new to kqueue so might be missing something

here's my attempt at a test - but it times out 009cbb5 (#2925)

Copy link
Member

@graingert graingert Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this:

from 9ddb5ecc47dda3a7502851967bb67247a99aefe5 mon sep 17 00:00:00 2001
from: thomas grainger <[email protected]>
date: tue, 24 dec 2024 08:24:41 +0000
subject: [patch] make sure closing monitor queues is a noop

---
 src/trio/_core/_io_kqueue.py  | 12 ++++++------
 src/trio/_core/_io_windows.py | 13 +++++++------
 2 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/src/trio/_core/_io_kqueue.py b/src/trio/_core/_io_kqueue.py
index fed4da83..08b5d966 100644
--- a/src/trio/_core/_io_kqueue.py
+++ b/src/trio/_core/_io_kqueue.py
@@ -137,12 +137,12 @@ class kqueueiomanager:
                 "attempt to register multiple listeners for same ident/filter pair",
             )
         send, recv = open_memory_channel[select.kevent](math.inf)
-        self._registered[key] = send
-        try:
-            yield recv
-        finally:
-            send.close()
-            self._registered.pop(key, none)
+        with send, recv:
+            self._registered[key] = send
+            try:
+                yield recv.clone()
+            finally:
+                self._registered.pop(key, none)
 
     @_public
     async def wait_kevent(
diff --git a/src/trio/_core/_io_windows.py b/src/trio/_core/_io_windows.py
index 4676e6c5..824e91f5 100644
--- a/src/trio/_core/_io_windows.py
+++ b/src/trio/_core/_io_windows.py
@@ -1040,9 +1040,10 @@ class windowsiomanager:
 
         key = next(self._completion_key_counter)
         send, recv = open_memory_channel[object](math.inf)
-        self._completion_key_queues[key] = send
-        try:
-            yield (key, recv)
-        finally:
-            send.close()
-            del self._completion_key_queues[key]
+        with send, recv:
+            self._completion_key_queues[key] = send
+            try:
+                yield (key, recv.clone())
+            finally:
+                del self._completion_key_queues[key]
-- 
2.43.0


# kevent registration is complicated -- e.g. aio submission can
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
Expand All @@ -122,25 +123,29 @@ def current_kqueue(self) -> select.kqueue:
@contextmanager
@_public
def monitor_kevent(
self,
ident: int,
filter: int,
) -> Iterator[_core.UnboundedQueue[select.kevent]]:
self, ident: int, filter: int
) -> Iterator[MemoryReceiveChannel[select.kevent]]:
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__.
"""
from .._channel import open_memory_channel

key = (ident, filter)
if key in self._registered:
raise _core.BusyResourceError(
"attempt to register multiple listeners for same ident/filter pair",
)
q = _core.UnboundedQueue[select.kevent]()
self._registered[key] = q
send, recv = open_memory_channel[select.kevent](math.inf)
self._registered[key] = send
try:
yield q
yield recv
finally:
del self._registered[key]
send.close()
try:
del self._registered[key]
except KeyError:
pass

@_public
async def wait_kevent(
Expand Down Expand Up @@ -275,20 +280,15 @@ def notify_closing(self, fd: int | _HasFileNo) -> None:

for filter_ in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]:
key = (fd, filter_)
receiver = self._registered.get(key)

if receiver is None:
try:
receiver = self._registered.pop(key)
except KeyError:
continue

if type(receiver) is _core.Task:
event = select.kevent(fd, filter_, select.KQ_EV_DELETE)
self._kqueue.control([event], 0)
exc = _core.ClosedResourceError("another task closed this fd")
_core.reschedule(receiver, outcome.Error(exc))
del self._registered[key]
else:
# XX this is an interesting example of a case where being able
# to close a queue would be useful...
raise NotImplementedError(
"can't close an fd that monitor_kevent is using",
)
receiver.close()
20 changes: 13 additions & 7 deletions src/trio/_core/_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import enum
import itertools
import math
import socket
import sys
from contextlib import contextmanager
Expand Down Expand Up @@ -44,9 +45,9 @@

from typing_extensions import Buffer, TypeAlias

from .._channel import MemoryReceiveChannel, MemorySendChannel
from .._file_io import _HasFileNo
from ._traps import Abort, RaiseCancelT
from ._unbounded_queue import UnboundedQueue

EventResult: TypeAlias = int
T = TypeVar("T")
Expand Down Expand Up @@ -455,7 +456,7 @@ def __init__(self) -> None:
self._overlapped_waiters: dict[CData, _core.Task] = {}
self._posted_too_late_to_cancel: set[CData] = set()

self._completion_key_queues: dict[int, UnboundedQueue[object]] = {}
self._completion_key_queues: dict[int, MemorySendChannel[object]] = {}
self._completion_key_counter = itertools.count(CKeys.USER_DEFINED)

with socket.socket() as s:
Expand Down Expand Up @@ -641,7 +642,7 @@ def process_events(self, received: EventResult) -> None:
lpOverlapped=overlapped,
dwNumberOfBytesTransferred=transferred,
)
queue.put_nowait(info)
queue.send_nowait(info)

def _register_with_iocp(self, handle_: int | CData, completion_key: int) -> None:
handle = _handle(handle_)
Expand Down Expand Up @@ -1027,16 +1028,21 @@ def current_iocp(self) -> int:

@contextmanager
@_public
def monitor_completion_key(self) -> Iterator[tuple[int, UnboundedQueue[object]]]:
def monitor_completion_key(
self,
) -> Iterator[tuple[int, MemoryReceiveChannel[object]]]:
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__ and `#52
<https://github.com/python-trio/trio/issues/52>`__.
"""
from .._channel import open_memory_channel

key = next(self._completion_key_counter)
queue = _core.UnboundedQueue[object]()
self._completion_key_queues[key] = queue
send, recv = open_memory_channel[object](math.inf)
self._completion_key_queues[key] = send
try:
yield (key, queue)
yield (key, recv)
finally:
send.close()
del self._completion_key_queues[key]
11 changes: 5 additions & 6 deletions src/trio/_core/_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
# theirs and our tasks are lighter, so for us #objects is smaller and #tasks
# is larger.
#
# This is in the core because for two reasons. First, it's used by
# UnboundedQueue, and UnboundedQueue is used for a number of things in the
# core. And second, it's responsible for providing fairness to all of our
# high-level synchronization primitives (locks, queues, etc.). For now with
# our FIFO scheduler this is relatively trivial (it's just a FIFO waitqueue),
# but in the future we ever start support task priorities or fair scheduling
# This is in the core because it's responsible for providing fairness to all
# of our high-level synchronization primitives (locks, queues, etc.). For now
# with our FIFO scheduler this is relatively trivial (it's just a FIFO
# waitqueue), but in the future we ever start support task priorities or fair
# scheduling
Comment on lines +27 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grammar here is a bit odd. I would change it to something like
"but in the future if we ever support task priorities or
fair scheduling it should be fairly simple."

#
# https://github.com/python-trio/trio/issues/32
#
Expand Down
7 changes: 5 additions & 2 deletions src/trio/_core/_tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ def check(*, expected_readers: int, expected_writers: int) -> None:
check(expected_readers=1, expected_writers=0)


@pytest.mark.filterwarnings("ignore:.*UnboundedQueue:trio.TrioDeprecationWarning")
async def test_io_manager_kqueue_monitors_statistics() -> None:
def check(
*,
Expand All @@ -411,13 +410,17 @@ def check(
# 1 for call_soon_task
check(expected_monitors=0, expected_readers=1, expected_writers=0)

with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ):
with _core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ) as q:
with (
pytest.raises(_core.BusyResourceError),
_core.monitor_kevent(a1.fileno(), select.KQ_FILTER_READ),
):
pass # pragma: no cover
check(expected_monitors=1, expected_readers=1, expected_writers=0)
_core.notify_closing(a1)
a1.close()
with trio.fail_after(1):
assert len([v async for v in q]) == 0

check(expected_monitors=0, expected_readers=1, expected_writers=0)

Expand Down
Loading
Loading