Skip to content

Commit

Permalink
Rewrite unix pipe fd handling logic
Browse files Browse the repository at this point in the history
This addresses a number of issues:

- Fixes a major issue where aclose() called notify_fd_closed()
  unconditionally, even if the fd was closed; if the fd had already
  been recycled this could (and did) affect unrelated file
  descriptors:
    python-trio#661 (comment)
- Fixes a theoretical issue (not yet observed in the wild) where a
  poorly timed close could fail to be noticed by other tasks (python-triogh-661)
- Adds ConflictDetectors to catch attempts to use the same stream from
  multiple tasks simultaneously
- Switches from inheritance to composition (python-triogh-830)

Still todo:

- Tests for these race conditions that snuck through
- Audit _windows_pipes.py and _socket.py for related issues
  • Loading branch information
njsmith committed Jan 23, 2019
1 parent e26b908 commit f048ea5
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 101 deletions.
3 changes: 3 additions & 0 deletions newsfragments/661.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed several bugs in the new subprocess pipe support, where
operations on a closed pipe could accidentally affect another
unrelated pipe due to internal file-descriptor reuse.
214 changes: 129 additions & 85 deletions trio/_unix_pipes.py
Original file line number Diff line number Diff line change
@@ -1,117 +1,161 @@
import fcntl
import os
from typing import Tuple
import errno

from . import _core
from ._abc import SendStream, ReceiveStream
from ._util import ConflictDetector

__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"]


class _PipeMixin:
def __init__(self, pipefd: int) -> None:
if not isinstance(pipefd, int):
raise TypeError(
"{0.__class__.__name__} needs a pipe fd".format(self)
)

self._pipe = pipefd
self._closed = False

flags = fcntl.fcntl(self._pipe, fcntl.F_GETFL)
fcntl.fcntl(self._pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK)

def _close(self):
if self._closed:
class _FdHolder:
# This class holds onto a raw file descriptor, in non-blocking mode, and
# is responsible for managing its lifecycle. In particular, it's
# responsible for making sure it gets closed, and also for tracking
# whether it's been closed.
#
# The way we track closure is to set the .fd field to -1, discarding the
# original value. You might think that this is a strange idea, since it
# overloads the same field to do two different things. Wouldn't it be more
# natural to have a dedicated .closed field? But that would be more
# error-prone. Fds are represented by small integers, and once an fd is
# closed, its integer value may be reused immediately. If we accidentally
# used the old fd after being closed, we might end up doing something to
# another unrelated fd that happened to get assigned the same integer
# value. By throwing away the integer value immediately, it becomes
# impossible to make this mistake – we'll just get an EBADF.
#
# (This trick was copied from the stdlib socket module.)
def __init__(self, fd: int):
if not isinstance(fd, int):
raise TypeError("file descriptor must be an int")
self.fd = fd
# Flip the fd to non-blocking mode
flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

@property
def closed(self):
return self.fd == -1

def _raw_close(self):
# This doesn't assume it's in a trio context, so it can be called from
# __del__. You should never call it from Trio context, because it
# skips calling notify_fd_close. But from __del__, skipping that is
# OK, because notify_fd_close just wakes up other tasks that are
# waiting on this fd, and those tasks hold a reference to this object.
# So if __del__ is being called, we know there aren't any tasks that
# need to be woken.
if self.closed:
return
fd = self.fd
self.fd = -1
os.close(fd)

self._closed = True
os.close(self._pipe)
def __del__(self):
self._raw_close()

async def aclose(self):
# XX: This would be in _close, but this can only be used from an
# async context.
_core.notify_fd_close(self._pipe)
self._close()
if not self.closed:
_core.notify_fd_close(self.fd)
self._raw_close()
await _core.checkpoint()

def fileno(self) -> int:
"""Gets the file descriptor for this pipe."""
return self._pipe

def __del__(self):
self._close()


class PipeSendStream(_PipeMixin, SendStream):
class PipeSendStream(SendStream):
"""Represents a send stream over an os.pipe object."""

def __init__(self, fd: int):
self._fd_holder = _FdHolder(fd)
self._conflict_detector = ConflictDetector(
"another task is using this pipe"
)

async def send_all(self, data: bytes):
# we have to do this no matter what
await _core.checkpoint()
if self._closed:
raise _core.ClosedResourceError("this pipe is already closed")
async with self._conflict_detector:
# have to check up front, because send_all(b"") on a closed pipe
# should raise
if self._fd_holder.closed:
raise _core.ClosedResourceError("this pipe was already closed")

length = len(data)
# adapted from the SocketStream code
with memoryview(data) as view:
sent = 0
while sent < length:
with view[sent:] as remaining:
try:
sent += os.write(self._fd_holder.fd, remaining)
except BlockingIOError:
await _core.wait_writable(self._fd_holder.fd)
except OSError as e:
if e.errno == errno.EBADF:
raise _core.ClosedResourceError(
"this pipe was closed"
) from None
else:
raise _core.BrokenResourceError from e

if not data:
return
async def wait_send_all_might_not_block(self) -> None:
async with self._conflict_detector:
if self._fd_holder.closed:
raise _core.ClosedResourceError("this pipe was already closed")
try:
await _core.wait_writable(self._fd_holder.fd)
except BrokenPipeError as e:
# kqueue: raises EPIPE on wait_writable instead
# of sending, which is annoying
raise _core.BrokenResourceError from e

length = len(data)
# adapted from the SocketStream code
with memoryview(data) as view:
total_sent = 0
while total_sent < length:
with view[total_sent:] as remaining:
try:
total_sent += os.write(self._pipe, remaining)
except BrokenPipeError as e:
await _core.checkpoint()
raise _core.BrokenResourceError from e
except BlockingIOError:
await self.wait_send_all_might_not_block()
async def aclose(self):
await self._fd_holder.aclose()

async def wait_send_all_might_not_block(self) -> None:
if self._closed:
await _core.checkpoint()
raise _core.ClosedResourceError("This pipe is already closed")

try:
await _core.wait_writable(self._pipe)
except BrokenPipeError as e:
# kqueue: raises EPIPE on wait_writable instead
# of sending, which is annoying
# also doesn't checkpoint so we have to do that
# ourselves here too
await _core.checkpoint()
raise _core.BrokenResourceError from e


class PipeReceiveStream(_PipeMixin, ReceiveStream):
def fileno(self):
return self._fd_holder.fd


class PipeReceiveStream(ReceiveStream):
"""Represents a receive stream over an os.pipe object."""

def __init__(self, fd: int):
self._fd_holder = _FdHolder(fd)
self._conflict_detector = ConflictDetector(
"another task is using this pipe"
)

async def receive_some(self, max_bytes: int) -> bytes:
if self._closed:
await _core.checkpoint()
raise _core.ClosedResourceError("this pipe is already closed")
async with self._conflict_detector:
if not isinstance(max_bytes, int):
raise TypeError("max_bytes must be integer >= 1")

if max_bytes < 1:
raise ValueError("max_bytes must be integer >= 1")

while True:
try:
data = os.read(self._fd_holder.fd, max_bytes)
except BlockingIOError:
await _core.wait_readable(self._fd_holder.fd)
except OSError as e:
await _core.cancel_shielded_checkpoint()
if e.errno == errno.EBADF:
raise _core.ClosedResourceError(
"this pipe was closed"
) from None
else:
raise _core.BrokenResourceError from e
else:
break

if not isinstance(max_bytes, int):
await _core.checkpoint()
raise TypeError("max_bytes must be integer >= 1")
return data

if max_bytes < 1:
await _core.checkpoint()
raise ValueError("max_bytes must be integer >= 1")
async def aclose(self):
await self._fd_holder.aclose()

while True:
try:
await _core.checkpoint_if_cancelled()
data = os.read(self._pipe, max_bytes)
except BlockingIOError:
await _core.wait_readable(self._pipe)
else:
await _core.cancel_shielded_checkpoint()
break

return data
def fileno(self):
return self._fd_holder.fd


async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]:
Expand Down
28 changes: 12 additions & 16 deletions trio/tests/test_unix_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,22 @@

async def test_send_pipe():
r, w = os.pipe()
send = PipeSendStream(w)
assert send.fileno() == w
await send.send_all(b"123")
assert (os.read(r, 8)) == b"123"
async with PipeSendStream(w) as send:
assert send.fileno() == w
await send.send_all(b"123")
assert (os.read(r, 8)) == b"123"

os.close(r)
os.close(w)
send._closed = True
os.close(r)


async def test_receive_pipe():
r, w = os.pipe()
recv = PipeReceiveStream(r)
assert (recv.fileno()) == r
os.write(w, b"123")
assert (await recv.receive_some(8)) == b"123"
async with PipeReceiveStream(r) as recv:
assert (recv.fileno()) == r
os.write(w, b"123")
assert (await recv.receive_some(8)) == b"123"

os.close(r)
os.close(w)
recv._closed = True
os.close(w)


async def test_pipes_combined():
Expand Down Expand Up @@ -90,8 +86,8 @@ async def test_async_with():
async with w, r:
pass

assert w._closed
assert r._closed
assert w.fileno() == -1
assert r.fileno() == -1

with pytest.raises(OSError) as excinfo:
os.close(w.fileno())
Expand Down

0 comments on commit f048ea5

Please sign in to comment.