Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-76785: More Fixes for test.support.interpreters #113012

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 87 additions & 71 deletions Lib/test/support/interpreters/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import queue
import time
import weakref
import _xxinterpchannels as _channels
import _xxinterpchannels as _queues
import _xxinterpqueues as _queues

# aliases:
from _xxinterpchannels import (
ChannelError as QueueError,
ChannelNotFoundError as QueueNotFoundError,
from _xxinterpqueues import (
QueueError, QueueNotFoundError,
)

__all__ = [
Expand All @@ -19,14 +17,27 @@
]


class QueueEmpty(_queues.QueueEmpty, queue.Empty):
"""Raised from get_nowait() when the queue is empty.

It is also raised from get() if it times out.
"""


class QueueFull(_queues.QueueFull, queue.Full):
"""Raised from put_nowait() when the queue is full.

It is also raised from put() if it times out.
"""


def create(maxsize=0):
"""Return a new cross-interpreter queue.

The queue may be used to pass data safely between interpreters.
"""
# XXX honor maxsize
qid = _queues.create()
return Queue._with_maxsize(qid, maxsize)
qid = _queues.create(maxsize)
return Queue(qid)


def list_all():
Expand All @@ -35,53 +46,37 @@ def list_all():
for qid in _queues.list_all()]


class QueueEmpty(queue.Empty):
"""Raised from get_nowait() when the queue is empty.

It is also raised from get() if it times out.
"""


class QueueFull(queue.Full):
"""Raised from put_nowait() when the queue is full.

It is also raised from put() if it times out.
"""


_known_queues = weakref.WeakValueDictionary()

class Queue:
"""A cross-interpreter queue."""

@classmethod
def _with_maxsize(cls, id, maxsize):
if not isinstance(maxsize, int):
raise TypeError(f'maxsize must be an int, got {maxsize!r}')
elif maxsize < 0:
maxsize = 0
else:
maxsize = int(maxsize)
self = cls(id)
self._maxsize = maxsize
return self

def __new__(cls, id, /):
# There is only one instance for any given ID.
if isinstance(id, int):
id = _channels._channel_id(id, force=False)
elif not isinstance(id, _channels.ChannelID):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
key = int(id)
try:
self = _known_queues[key]
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
self._maxsize = 0
_known_queues[key] = self
_known_queues[id] = self
_queues.bind(id)
return self

def __del__(self):
try:
_queues.release(self._id)
except QueueNotFoundError:
pass
try:
del _known_queues[self._id]
except KeyError:
pass

def __repr__(self):
return f'{type(self).__name__}({self.id})'

Expand All @@ -90,39 +85,58 @@ def __hash__(self):

@property
def id(self):
return int(self._id)
return self._id

@property
def maxsize(self):
return self._maxsize

@property
def _info(self):
return _channels.get_info(self._id)
try:
return self._maxsize
except AttributeError:
self._maxsize = _queues.get_maxsize(self._id)
return self._maxsize

def empty(self):
return self._info.count == 0
return self.qsize() == 0

def full(self):
if self._maxsize <= 0:
return False
return self._info.count >= self._maxsize
return _queues.is_full(self._id)

def qsize(self):
return self._info.count
return _queues.get_count(self._id)

def put(self, obj, timeout=None):
# XXX block if full
_channels.send(self._id, obj, blocking=False)
def put(self, obj, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.

This blocks while the queue is full.
"""
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
while True:
try:
_queues.put(self._id, obj)
except _queues.QueueFull as exc:
if timeout is not None and time.time() >= end:
exc.__class__ = QueueFull
raise # re-raise
time.sleep(_delay)
else:
break

def put_nowait(self, obj):
# XXX raise QueueFull if full
return _channels.send(self._id, obj, blocking=False)
try:
return _queues.put(self._id, obj)
except _queues.QueueFull as exc:
exc.__class__ = QueueFull
raise # re-raise

def get(self, timeout=None, *,
_sentinel=object(),
_delay=10 / 1000, # 10 milliseconds
):
_delay=10 / 1000, # 10 milliseconds
):
"""Return the next object from the queue.

This blocks while the queue is empty.
Expand All @@ -132,25 +146,27 @@ def get(self, timeout=None, *,
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
obj = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
if timeout is not None and time.time() >= end:
raise QueueEmpty
obj = _channels.recv(self._id, _sentinel)
while True:
try:
return _queues.get(self._id)
except _queues.QueueEmpty as exc:
if timeout is not None and time.time() >= end:
exc.__class__ = QueueEmpty
raise # re-raise
time.sleep(_delay)
return obj

def get_nowait(self, *, _sentinel=object()):
def get_nowait(self):
"""Return the next object from the channel.

If the queue is empty then raise QueueEmpty. Otherwise this
is the same as get().
"""
obj = _channels.recv(self._id, _sentinel)
if obj is _sentinel:
raise QueueEmpty
return obj
try:
return _queues.get(self._id)
except _queues.QueueEmpty as exc:
exc.__class__ = QueueEmpty
raise # re-raise


# XXX add this:
#_channels._register_queue_type(Queue)
_queues._register_queue_type(Queue)
96 changes: 81 additions & 15 deletions Lib/test/test_interpreters/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@

from test.support import import_helper
# Raise SkipTest if subinterpreters not supported.
import_helper.import_module('_xxinterpchannels')
#import_helper.import_module('_xxinterpqueues')
_queues = import_helper.import_module('_xxinterpqueues')
from test.support import interpreters
from test.support.interpreters import queues
from .utils import _run_output, TestBase


class TestBase(TestBase):
def tearDown(self):
for qid in _queues.list_all():
try:
_queues.destroy(qid)
except Exception:
pass


class QueueTests(TestBase):

def test_create(self):
Expand All @@ -32,20 +40,47 @@ def test_create(self):
self.assertEqual(queue.maxsize, 0)

with self.subTest('negative maxsize'):
queue = queues.create(-1)
self.assertEqual(queue.maxsize, 0)
queue = queues.create(-10)
self.assertEqual(queue.maxsize, -10)

with self.subTest('bad maxsize'):
with self.assertRaises(TypeError):
queues.create('1')

@unittest.expectedFailure
def test_shareable(self):
queue1 = queues.create()
queue2 = queues.create()
queue1.put(queue2)
queue3 = queue1.get()
self.assertIs(queue3, queue1)

interp = interpreters.create()
interp.exec_sync(dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
"""));

with self.subTest('same interpreter'):
queue2 = queues.create()
queue1.put(queue2)
queue3 = queue1.get()
self.assertIs(queue3, queue2)

with self.subTest('from current interpreter'):
queue4 = queues.create()
queue1.put(queue4)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
"""))
qid = int(out)
self.assertEqual(qid, queue4.id)

with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
queue1.put(queue5)
print(queue5.id)
"""))
qid = int(out)
queue5 = queue1.get()
self.assertEqual(queue5.id, qid)

def test_id_type(self):
queue = queues.create()
Expand Down Expand Up @@ -137,7 +172,6 @@ def test_put_get_main(self):

self.assertEqual(actual, expected)

@unittest.expectedFailure
def test_put_timeout(self):
queue = queues.create(2)
queue.put(None)
Expand All @@ -147,7 +181,6 @@ def test_put_timeout(self):
queue.get()
queue.put(None)

@unittest.expectedFailure
def test_put_nowait(self):
queue = queues.create(2)
queue.put_nowait(None)
Expand Down Expand Up @@ -179,31 +212,64 @@ def test_put_get_same_interpreter(self):
assert obj is not orig, 'expected: obj is not orig'
"""))

@unittest.expectedFailure
def test_put_get_different_interpreters(self):
interp = interpreters.create()
queue1 = queues.create()
queue2 = queues.create()
self.assertEqual(len(queues.list_all()), 2)

obj1 = b'spam'
queue1.put(obj1)

out = _run_output(
interpreters.create(),
interp,
dedent(f"""
import test.support.interpreters.queue as queues
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
queue2 = queues.Queue({queue2.id})
assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1'
obj = queue1.get()
assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0'
assert obj == b'spam', 'expected: obj == obj1'
# When going to another interpreter we get a copy.
assert id(obj) != {id(obj1)}, 'expected: obj is not obj1'
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
queue2.put(obj2)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
obj2 = queue2.get()
self.assertEqual(len(queues.list_all()), 2)
self.assertEqual(queue1.qsize(), 0)
self.assertEqual(queue2.qsize(), 1)

obj2 = queue2.get()
self.assertEqual(obj2, b'eggs')
self.assertNotEqual(id(obj2), int(out))

def test_put_cleared_with_subinterpreter(self):
interp = interpreters.create()
queue = queues.create()

out = _run_output(
interp,
dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
queue.put(obj1)
queue.put(obj2)
"""))
self.assertEqual(queue.qsize(), 2)

obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), 1)

del interp
self.assertEqual(queue.qsize(), 0)

def test_put_get_different_threads(self):
queue1 = queues.create()
queue2 = queues.create()
Expand Down
Loading
Loading