Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pipes. #621

Merged
merged 15 commits into from
Aug 29, 2018
Merged

Add support for pipes. #621

merged 15 commits into from
Aug 29, 2018

Conversation

Fuyukai
Copy link
Member

@Fuyukai Fuyukai commented Aug 22, 2018

This is Step 4 of #4 (comment), and adds support for reading/writing from os.pipe objects to Trio.

@njsmith njsmith mentioned this pull request Aug 22, 2018
@codecov
Copy link

codecov bot commented Aug 22, 2018

Codecov Report

Merging #621 into master will increase coverage by 0.02%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #621      +/-   ##
==========================================
+ Coverage   99.28%   99.31%   +0.02%     
==========================================
  Files          91       93       +2     
  Lines       10785    10954     +169     
  Branches      770      782      +12     
==========================================
+ Hits        10708    10879     +171     
+ Misses         58       56       -2     
  Partials       19       19
Impacted Files Coverage Δ
trio/tests/subprocess/test_unix_pipes.py 100% <100%> (ø)
trio/_subprocess/unix_pipes.py 100% <100%> (ø)
trio/testing/_check_streams.py 99.31% <0%> (+0.68%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update fc7c7e0...aaad5b5. Read the comment docs.

@njsmith
Copy link
Member

njsmith commented Aug 23, 2018

Highlevel comment on both this and #622: this functionality is great, and obviously an important step towards #4. One question we need to answer along the way is how we're going to expose it – eventually it'll be part of some full-fledged subprocess spawning API, but should we expose the individual pieces, and if so, how? For WaitForSingleObject, this was pretty easy: it's an OS-specific core primitive, and the proper API was obvious, so, make it public in trio.hazmat, done. For these, it's a little more complicated: these particular implementations are specific to Unix/Linux, but the concepts are more general; and, I think we'll probably want to fine-tune the API as we go. (For example, with pipes we need to create a pipe where one end is wrapped in trio and non-blocking, and the other is in blocking mode to be passed to the child; and with waitpid, I suspect we might adjust how we track the WaitpidResult once we have Popen objects that could hold them – like maybe we'll kick off the waitpid call immediately when the process is created, and stash the WaitpidResult in the Popen object?)

So, my suggestion for right now is:

  • Go ahead and add the implementations, maybe even in a subpackage like trio/_subprocess/...
  • Go ahead and write the tests
  • Don't export them publically
  • Don't write newsfragments (because there's nothing for users yet)

That way we can incrementally write the different pieces, review them, test them, etc., and eventually we'll figure out how to hook them together into a coherent subprocess API.

Does that make sense?

@njsmith
Copy link
Member

njsmith commented Aug 23, 2018

Two more quick comments before I go to bed:

  • We're going to need __del__ methods here... most of trio doesn't bother, because e.g. SocketStream holds a trio.socket.SocketType, which holds a socket.socket... and socket.socket already has a __del__ that makes sure the OS-level socket object is eventually closed. os.pipe doesn't provide that kind of wrapper, so we have to take care of __del__ ourselves.

  • Check out trio.testing.check_one_way_stream

@Fuyukai
Copy link
Member Author

Fuyukai commented Aug 23, 2018

Re: check_one_way_stream - this can't be used, because it does a check that closing side W of a pipe ensures side R cancels reading (which, well, it doesn't, because one half of the fds are closed on a fork-with-exec).

Copy link
Member

@njsmith njsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more detailed comments, in addition to the more general ones above...

self._pipe = pipefd

async def aclose(self):
os.close(self._pipe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make sure that after aclose, future operations will fail with ClosedStreamError, even if the file descriptor value gets reallocated to a new file. One way would be to add a self._closed attribute that we check before each operation. The other option (what socket.socket does) is to assign self._pipe = -1, since -1 cannot be a valid fd.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The -1 thing is also baked into the _translate_socket_errors_to_stream_errors... on Unix, it's whole job is to convert EBADF into a ClosedStreamError (because EBADF is what you get when you try to use -1 as an fd), and convert everything else into BrokenStreamError.

It might make sense to write a _translate_pipe_errors_to_stream_errors, even if it's very similar to the socket version, just because both error-converters are so tightly coupled to internal implementation decisions that it's nice to not have to think about code in other files when looking at them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and we should be calling notify_fd_close here too.

class _PipeMixin:
def __init__(self, pipefd: int):
if not isinstance(pipefd, int):
raise TypeError("PipeSendStream needs a pipe fd")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message has wrong class name

with view[total_sent:] as remaining:
total_sent += os.write(self._pipe, remaining)

await self.wait_send_all_might_not_block()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to crash if the pipe buffer fills up – os.write will raise BlockingIOError (EWOULDBLOCK), and we don't have anything to catch it. In SocketStream.send_all, it has a slightly easier job, because it doesn't call the raw OS send function directly – it calls the trio.socket send method, and that takes care or retrying and waiting. See _nonblocking_helper and _try_sync in trio.socket for the gory details... not that they're perfect, they're probably a bit over-elaborate themselves.

await self.wait_send_all_might_not_block()

async def wait_send_all_might_not_block(self) -> None:
await _core.wait_socket_writable(self._pipe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be using socket functions on non-socket fds, that's just confusing :-). Of course we can get away with it on Unix, and this is Unix-only code, so it's more a style thing. But if this code were accidentally run on Windows, then calling the socket functions would invoke undefined behavior, while using the fd versions would give a clean error.

def make_pipe() -> Tuple[PipeReceiveStream, PipeSendStream]:
"""Makes a new pair of pipes."""
(r, w) = os.pipe2(os.O_NONBLOCK)
return PipeReceiveStream(r), PipeSendStream(w)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it'll end up being a bit nicer if we make the stream __init__ methods accept arbitrary pipes, and set them to non-blocking mode at that point. Then when we only want to use one side of a pipe with trio, we can write

r_raw, w_raw = os.pipe()
r_trio = PipeReceiveStream(r_raw)
# ... now use r_trio and w_raw ...

Also, elsewhere in trio where we return a pair like this, we do (sender, receiver). Which yeah is the opposite of os.pipe. But trio's way is easier to remember ("data flows from left to right, like you read").

Neither of these are big issues right now since we're keeping this private, and once we start trying to use it for real we'll probably find other things we want to tweak. But figured I'd mention.

@njsmith
Copy link
Member

njsmith commented Aug 24, 2018

Re: check_one_way_stream - this can't be used, because it does a check that closing side W of a pipe ensures side R cancels reading (which, well, it doesn't, because one half of the fds are closed on a fork-with-exec).

Not sure I follow... we wouldn't be doing fork/exec in the middle of a call to check_one_way_stream, right? Obviously when we actually use the pipe streams it'll be in a more complicated situation involving subprocesses, but that doesn't mean we can't use check_one_way_stream in a simple in-process case to validate that the stream code works and handles the edge cases correctly.

@Fuyukai
Copy link
Member Author

Fuyukai commented Aug 24, 2018

The check_one_way_stream runs a test that ensures that closing the S side of the stream stops the R side from working - with pipes, this can't happen due to child processes and what-not.

@njsmith
Copy link
Member

njsmith commented Aug 24, 2018

The check_one_way_stream runs a test that ensures that closing the S side of the stream stops the R side from working - with pipes, this can't happen due to child processes and what-not.

This isn't a problem. It would be a problem if we stopped in the middle of a test to spawn a child process and shared our pipe object with it, but we won't do that :-). If you're just using a pipe object within a process, then it follows the usual rule where closing the send side causes the receive side to get an EOF:

In [1]: import os

In [2]: R, S = os.pipe()

In [3]: os.write(S, b"x")
Out[3]: 1

In [4]: os.close(S)

In [5]: os.read(R, 10)
Out[5]: b'x'

In [6]: os.read(R, 10)
Out[6]: b''

@@ -0,0 +1,25 @@
import pytest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making another copy of this, I'm thinking we should put the subprocess tests into trio/tests/subprocess/? Or _subprocess/ or test_subprocess/, I guess, whichever looks nicest. (And at some point we should move the _core tests into trio/tests/ too. And probably rename it to trio/_tests/. But that can wait for another PR...)


pytestmark = pytest.mark.skipif(
not hasattr(os, "pipe2"), reason="pipes require os.pipe2()"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we don't use pipe2 this looks a little odd :-). Maybe skipif(os.name != "posix", reason=...) would be a bit clearer. (I think it has the same effect.)

self._pipe = pipefd
self._closed = False

if set_non_blocking:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any reason for this to be configurable? We can just do it unconditionally.

pass
except OSError as e:
# already closed from somewhere else
if e.errno != 9:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, you should never hard-code errno values like this – they're arbitrary, and can be different on different platforms. (Though 9 does seem to be EBADF on both Linux and macOS... weird!) Instead, use the stdlib errno module, e.g. e.errno != errno.EBADF.

Except... in this case, I actually think we shouldn't be filtering out EBADF. Instead we should follow the rule that once you give an fd to one of these pipe objects, the pipe object "takes ownership" of it, and is the only object that should be used to close it. Closing the fileno somewhere else is a bug.

In fact, this probably explains some of your weird test failures. I bet what's happening is:

  1. The test allocates the fd, and wraps it in a trio pipe object
  2. The test closes the fd by hand
  3. Later, some other fd is opened, and is assigned the same numerical value as the earlier pipe fd.
  4. Finally, the trio pipe object gets GCed, and its __del__ method closes whatever random fd was opened in step 3.
  5. Chaos ensues.

File descriptors are a weird kind of global state with no guard-rails. They're very C-ish. It's important to know exactly who is responsible for managing them.


if max_bytes < 1:
await _core.checkpoint()
raise ValueError("max_bytes must be integer >= 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can save a bit of repetition by making it if not isinstance(max_bytes, int) or max_bytes < 1

Copy link
Member Author

@Fuyukai Fuyukai Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't, must be different errors for the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, good point :-)

@@ -0,0 +1 @@
Add support for pipes (os.pipe).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No newsfragment for now, since the API's not public yet.

Copy link
Member

@njsmith njsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another pass.

def nonblock_pipe(p: int):
import fcntl
flags = fcntl.fcntl(p, fcntl.F_GETFL)
fcntl.fcntl(p, fcntl.F_SETFL, flags | os.O_NONBLOCK)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this? You seem to only call it on fds that are about to be passed to PipeSendStream or PipeReceiveStream, which also set their inputs to non-blocking, so it seems redundant...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clearly wasn't thinking clearly when I wrote this, whoops.

try:
data = os.read(self._pipe, max_bytes)
if data == b'':
await self.aclose()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I'm surprised this passes check_one_way_stream! You shouldn't close here; after receive_some returns b"", calling it again should give b"" again, not ClosedResourceError. And we should fix check_one_way_stream so that it tests this corner case. Well done finding that omission :-).

except BlockingIOError:
await _core.wait_readable(self._pipe)
else:
await _core.checkpoint()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a full checkpoint here unfortunately doesn't quite work right – the problem is that this should only raise Cancelled if it did nothing. If we do a full checkpoint here, then we could read some data out of the pipe, and then drop it on the floor when Cancelled is raised.

This is why the operations in trio/_socket.py split the checkpoint into two pieces: before we attempt the operation, we do a await checkpoint_if_cancelled(), and then after the operation succeeds, we do a await cancel_shielded_checkpoint(). That way we always do a full checkpoint one way or another, but if the operation succeeds we're guaranteed not to raise Cancelled.



async def test_pipe_fully():
await check_one_way_stream(make_pipe, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fill in the second argument here – it lets check_one_way_stream do much more thorough testing.

Something like:

async def make_clogged_pipe():
    s, r = make_pipe()
    try:
        while True:
            # We want to totally fill up the pipe buffer.
            # This requires working around a weird feature that POSIX pipes have.
            # If you do a write of <= PIPE_BUF bytes, then it's guaranteed
            # to either complete entirely, or not at all. So if we tried to write
            # PIPE_BUF bytes, and the buffer's free space is only
            # PIPE_BUF/2, then the write will raise BlockingIOError... even
            # though a smaller write could still succeed! To avoid this,
            # make sure to write >PIPE_BUF bytes each time, which disables
            # the special behavior.
            # For details, search for PIPE_BUF here:
            #   http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
            os.write(s.fileno(), b"x" * select.PIPE_BUF * 2)
    except BlockingIOError:
        pass
    return s, r

except BlockingIOError:
pass

await self.wait_send_all_might_not_block()
Copy link
Member

@njsmith njsmith Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way I'd structure send_all is:

  • Just do a checkpoint as the very first thing, given how this is structured it doesn't seem worth the hassle of avoiding this.

  • Structure the main loop like:

while True:
    <try to write some data>
    <check if we're done; if so, break out of the loop>
    <wait for the pipe to be writable>

# also doesn't checkpoint so we have to do that
# ourselves here too
await _core.checkpoint()
raise BrokenStreamError from e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this inside wait_send_all_might_not_block?

Copy link
Member

@njsmith njsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, did a close read as requested. A few trivial little comments, and one substantive one: where'd __del__ go? Before there were bugs because __del__ was closing the fds and also your tests were closing the same fds directly, but we should fix that by not closing the fds directly, not by removing __del__ :-).

self._pipe = pipefd
self._closed = False

import fcntl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this import to the top of the file, like other imports?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import was originally there because windows tests would import test_unix_pipes and crash when fcntl was imported. I rearranged some stuff in the test so now this won't happen.


if max_bytes < 1:
await _core.checkpoint()
raise ValueError("max_bytes must be integer >= 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, good point :-)

await _core.cancel_shielded_checkpoint()
break

return data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess technically you could move the checkpoint stuff out of the loop but in practice it doesn't really matter.

# the special behavior.
# For details, search for PIPE_BUF here:
# http://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html
buf_size = getattr(select, "PIPE_BUF", 8192)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment on this with a link to https://bitbucket.org/pypy/pypy/issues/2876/selectpipe_buf-is-missing-on-pypy3 , to improve our chances of noticing later when it becomes unnecessary.

@Fuyukai
Copy link
Member Author

Fuyukai commented Aug 26, 2018

Well, if __del__ doesn't close the fds, what else can it do?

@njsmith
Copy link
Member

njsmith commented Aug 26, 2018

As a general rule in Python, any OS resource should be owned by some object whose __del__ will make sure it's eventually closed. So, here that's the the Pipe*Stream classes, and they should have a __del__ that closes the fds. And then to avoid double-closes, once you pass an fd into a Pipe*Stream, then its __del__ or aclose should be the only thing that closes the fds.

I guess testing __del__ to make sure it actually closes things is a little tricky... I guess you could do something like:

# Maybe this should move somewhere else?
from trio._core.tests.tutil import gc_collect_harder

async def test_pipe_del():
    s, r = make_pipe()
    s_fd = s.fileno()
    r_fd = r.fileno()
    del s, r
    gc_collect_harder()
    with pytest.raises(OSError) as excinfo:
        os.close(s_fd)
    assert excinfo.value.errno == errno.EBADF
    with pytest.raises(OSError) as excinfo:
        os.close(r_fd)
    assert excinfo.value.errno == errno.EBADF

Copy link
Member

@njsmith njsmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment, and otherwise looks good – feel free to merge after addressing that :-)

Thanks for patience with all my nitpicking!


async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclose()
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to define __aenter__ and __aexit__ here – trio.abc.AsyncResource provides generic implementations, and SendStream and ReceiveStream inherit from AsyncResource.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, whoops.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants