Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams are iterable + receive_some doesn't require an explicit size #1123

Merged
merged 6 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Abstract base classes
* - :class:`ReceiveStream`
- :class:`AsyncResource`
- :meth:`~ReceiveStream.receive_some`
-
- ``__aiter__``, ``__anext__``
- :class:`~trio.testing.MemoryReceiveStream`
* - :class:`Stream`
- :class:`SendStream`, :class:`ReceiveStream`
Expand Down
39 changes: 21 additions & 18 deletions docs/source/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -908,12 +908,10 @@ And the second task's job is to process the data the server sends back:
:lineno-match:
:pyobject: receiver

It repeatedly calls ``await client_stream.receive_some(...)`` to get
more data from the server (again, all Trio streams provide this
method), and then checks to see if the server has closed the
connection. ``receive_some`` only returns an empty bytestring if the
connection has been closed; otherwise, it waits until some data has
arrived, up to a maximum of ``BUFSIZE`` bytes.
It uses an ``async for`` loop to fetch data from the server.
Alternatively, it could use `~trio.abc.ReceiveStream.receive_some`,
which is the opposite of `~trio.abc.SendStream.send_all`, but using
``async for`` saves some boilerplate.

And now we're ready to look at the server.

Expand Down Expand Up @@ -974,11 +972,11 @@ functions we saw in the last section:

The argument ``server_stream`` is provided by :func:`serve_tcp`, and
is the other end of the connection we made in the client: so the data
that the client passes to ``send_all`` will come out of
``receive_some`` here, and vice-versa. Then we have a ``try`` block
discussed below, and finally the server loop which alternates between
reading some data from the socket and then sending it back out again
(unless the socket was closed, in which case we quit).
that the client passes to ``send_all`` will come out here. Then we
have a ``try`` block discussed below, and finally the server loop
which alternates between reading some data from the socket and then
sending it back out again (unless the socket was closed, in which case
we quit).

So what's that ``try`` block for? Remember that in Trio, like Python
in general, exceptions keep propagating until they're caught. Here we
Expand Down Expand Up @@ -1029,7 +1027,7 @@ our client could use a single task like::
while True:
data = ...
await client_stream.send_all(data)
received = await client_stream.receive_some(BUFSIZE)
received = await client_stream.receive_some()
if not received:
sys.exit()
await trio.sleep(1)
Expand All @@ -1046,18 +1044,23 @@ line, any time we're expecting more than one byte of data, we have to
be prepared to call ``receive_some`` multiple times.

And where this would go especially wrong is if we find ourselves in
the situation where ``len(data) > BUFSIZE``. On each pass through the
loop, we send ``len(data)`` bytes, but only read *at most* ``BUFSIZE``
bytes. The result is something like a memory leak: we'll end up with
more and more data backed up in the network, until eventually
something breaks.
the situation where ``data`` is big enough that it passes some
internal threshold, and the operating system or network decide to
always break it up into multiple pieces. Now on each pass through the
loop, we send ``len(data)`` bytes, but read less than that. The result
is something like a memory leak: we'll end up with more and more data
backed up in the network, until eventually something breaks.

.. note:: If you're curious *how* things break, then you can use
`~trio.abc.ReceiveStream.receive_some`\'s optional argument to put
a limit on how many bytes you read each time, and see what happens.

We could fix this by keeping track of how much data we're expecting at
each moment, and then keep calling ``receive_some`` until we get it all::

expected = len(data)
while expected > 0:
received = await client_stream.receive_some(BUFSIZE)
received = await client_stream.receive_some(expected)
if not received:
sys.exit(1)
expected -= len(received)
Expand Down
11 changes: 3 additions & 8 deletions docs/source/tutorial/echo-client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
# - can't be in use by some other program on your computer
# - must match what we set in our echo server
PORT = 12345
# How much memory to spend (at most) on each call to recv. Pretty arbitrary,
# but shouldn't be too big or too small.
BUFSIZE = 16384

async def sender(client_stream):
print("sender: started!")
Expand All @@ -22,12 +19,10 @@ async def sender(client_stream):

async def receiver(client_stream):
print("receiver: started!")
while True:
data = await client_stream.receive_some(BUFSIZE)
async for data in client_stream:
print("receiver: got data {!r}".format(data))
if not data:
print("receiver: connection closed")
sys.exit()
print("receiver: connection closed")
sys.exit()

async def parent():
print("parent: connecting to 127.0.0.1:{}".format(PORT))
Expand Down
11 changes: 2 additions & 9 deletions docs/source/tutorial/echo-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
# - can't be in use by some other program on your computer
# - must match what we set in our echo client
PORT = 12345
# How much memory to spend (at most) on each call to recv. Pretty arbitrary,
# but shouldn't be too big or too small.
BUFSIZE = 16384

CONNECTION_COUNTER = count()

Expand All @@ -20,14 +17,10 @@ async def echo_server(server_stream):
ident = next(CONNECTION_COUNTER)
print("echo_server {}: started".format(ident))
try:
while True:
data = await server_stream.receive_some(BUFSIZE)
async for data in server_stream:
print("echo_server {}: received data {!r}".format(ident, data))
if not data:
print("echo_server {}: connection closed".format(ident))
return
print("echo_server {}: sending data {!r}".format(ident, data))
await server_stream.send_all(data)
print("echo_server {}: connection closed".format(ident))
# FIXME: add discussion of MultiErrors to the tutorial, and use
# MultiError.catch here. (Not important in this case, but important if the
# server code uses nurseries internally.)
Expand Down
10 changes: 10 additions & 0 deletions newsfragments/959.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
If you have a `~trio.abc.ReceiveStream` object, you can now use
``async for data in stream: ...`` instead of calling
`~trio.abc.ReceiveStream.receive_some`. Each iteration gives an
arbitrary sized chunk of bytes. And the best part is, the loop
automatically exits when you reach EOF, so you don't have to check for
it yourself anymore. Relatedly, you no longer need to pick a magic
buffer size value before calling
`~trio.abc.ReceiveStream.receive_some`; you can ``await
stream.receive_some()`` with no arguments, and the stream will
automatically pick a reasonable size for you.
2 changes: 1 addition & 1 deletion notes-to-self/graceful-shutdown-idea.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def shutting_down(self):
async def stream_handler(stream):
while True:
with gsm.cancel_on_graceful_shutdown():
data = await stream.receive_some(...)
data = await stream.receive_some()
if gsm.shutting_down:
break

Expand Down
26 changes: 19 additions & 7 deletions trio/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,26 +378,28 @@ class ReceiveStream(AsyncResource):
If you want to receive Python objects rather than raw bytes, see
:class:`ReceiveChannel`.

`ReceiveStream` objects can be used in ``async for`` loops. Each iteration
will produce an arbitrary sized chunk of bytes, like calling
`receive_some` with no arguments. Every chunk will contain at least one
byte, and the loop automatically exits when reaching end-of-file.

"""
__slots__ = ()

@abstractmethod
async def receive_some(self, max_bytes):
async def receive_some(self, max_bytes=None):
"""Wait until there is data available on this stream, and then return
at most ``max_bytes`` of it.
some of it.

A return value of ``b""`` (an empty bytestring) indicates that the
stream has reached end-of-file. Implementations should be careful that
they return ``b""`` if, and only if, the stream has reached
end-of-file!

This method will return as soon as any data is available, so it may
return fewer than ``max_bytes`` of data. But it will never return
more.

Args:
max_bytes (int): The maximum number of bytes to return. Must be
greater than zero.
greater than zero. Optional; if omitted, then the stream object
is free to pick a reasonable default.

Returns:
bytes or bytearray: The data received.
Expand All @@ -413,6 +415,16 @@ async def receive_some(self, max_bytes):

"""

@aiter_compat
def __aiter__(self):
return self

async def __anext__(self):
data = await self.receive_some()
if not data:
raise StopAsyncIteration
return data


class Stream(SendStream, ReceiveStream):
"""A standard interface for interacting with bidirectional byte streams.
Expand Down
4 changes: 2 additions & 2 deletions trio/_highlevel_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StapledStream(HalfCloseableStream):
left, right = trio.testing.memory_stream_pair()
echo_stream = StapledStream(SocketStream(left), SocketStream(right))
await echo_stream.send_all(b"x")
assert await echo_stream.receive_some(1) == b"x"
assert await echo_stream.receive_some() == b"x"

:class:`StapledStream` objects implement the methods in the
:class:`~trio.abc.HalfCloseableStream` interface. They also have two
Expand Down Expand Up @@ -96,7 +96,7 @@ async def send_eof(self):
else:
return await self.send_stream.aclose()

async def receive_some(self, max_bytes):
async def receive_some(self, max_bytes=None):
"""Calls ``self.receive_stream.receive_some``.

"""
Expand Down
10 changes: 9 additions & 1 deletion trio/_highlevel_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

__all__ = ["SocketStream", "SocketListener"]

# XX TODO: this number was picked arbitrarily. We should do experiments to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One wrinkle: AFAIK, each call to socket.recv() allocates a new bytes object that is large enough for the entire given chunksize. If large allocations are more expensive, passing a too-large buffer is probably bad for performance. (The allocators I know of use 128KB as their threshold for "this is big, mmap it instead of finding a free chunk" but if one used 64KB instead and we got a mmap/munmap pair on each receive, that feels maybe bad?)

My intuition favors a much lower buffer size, like 4KB or 8KB, but I also do most of my work on systems that are rarely backlogged, so my intuition might well be off when it comes to a high-throughput Trio application.

Another option we could consider: the socket owns a receive buffer (bytearray) which it reuses, calls recv_into(), and extracts just the amount actually received into a bytes for returning. Downside: spends 64KB (or whatever) per socket in steady state. Counterpoint: the OS-level socket buffers are probably much larger than that (but I don't know how much memory they occupy when the socket isn't backlogged).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting discussion but I don't want it to hold up merging the basic functionality, so I split it off into #1139

(Twisted has apparently used 64 KiB receive buffers for its entire existence and I can't find any evidence that anyone has ever thought twice about it. So we're probably not risking any disaster by starting with 64 KiB for now :-).)

# tune it. (Or make it dynamic -- one idea is to start small and increase it
# if we observe single reads filling up the whole buffer, at least within some
# limits.)
DEFAULT_RECEIVE_SIZE = 65536

_closed_stream_errnos = {
# Unix
errno.EBADF,
Expand Down Expand Up @@ -129,7 +135,9 @@ async def send_eof(self):
with _translate_socket_errors_to_stream_errors():
self.socket.shutdown(tsocket.SHUT_WR)

async def receive_some(self, max_bytes):
async def receive_some(self, max_bytes=None):
if max_bytes is None:
max_bytes = DEFAULT_RECEIVE_SIZE
if max_bytes < 1:
raise ValueError("max_bytes must be >= 1")
with _translate_socket_errors_to_stream_errors():
Expand Down
Loading