Skip to content

Commit

Permalink
add stream shutdown and support half-duplex operation
Browse files Browse the repository at this point in the history
Fixes one of the issues mentioned in #24526
  • Loading branch information
vtjnash committed Jul 28, 2021
1 parent f7f46af commit 6a98d0b
Show file tree
Hide file tree
Showing 14 changed files with 240 additions and 111 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Standard library changes
overflow in most cases. The new function `checked_length` is now available, which will try to use checked
arithmetic to error if the result may be wrapping. Or use a package such as SaferIntegers.jl when
constructing the range. ([#40382])
* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#40783]).

#### InteractiveUtils
* A new macro `@time_imports` for reporting any time spent importing packages and their dependencies ([#41612])
Expand Down
1 change: 1 addition & 0 deletions base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ write(::DevNull, ::UInt8) = 1
unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n
close(::DevNull) = nothing
wait_close(::DevNull) = wait()
bytesavailable(io::DevNull) = 0

let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR}
global write(io::CoreIO, x::UInt8) = Core.write(io, x)
Expand Down
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ export

# I/O and events
close,
shutdown,
countlines,
eachline,
readeach,
Expand Down
124 changes: 72 additions & 52 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,49 @@ function isopen end
Close an I/O stream. Performs a [`flush`](@ref) first.
"""
function close end

"""
shutdown(stream)
Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref)
first. Notify the other end that no more data will be written to the underlying
file. This is not supported by all IO types.
# Examples
```jldoctest
julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task
julia> write(io, "request");
julia> # calling `read(io)` here would block forever
julia> shutdown(io);
julia> read(io, String)
"request"
"""
function shutdown end

"""
flush(stream)
Commit all currently buffered writes to the given stream.
"""
function flush end
function wait_readnb end
function wait_close end

"""
bytesavailable(io)
Return the number of bytes available for reading before a read from this stream or buffer will block.
# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");
julia> bytesavailable(io)
34
```
"""
function bytesavailable end

"""
Expand All @@ -81,7 +121,7 @@ function readavailable end
"""
isreadable(io) -> Bool
Return `true` if the specified IO object is readable (if that can be determined).
Return `false` if the specified IO object is not readable.
# Examples
```jldoctest
Expand All @@ -99,12 +139,12 @@ true
julia> rm("myfile.txt")
```
"""
function isreadable end
isreadable(io::IO) = isopen(io)

"""
iswritable(io) -> Bool
Return `true` if the specified IO object is writable (if that can be determined).
Return `false` if the specified IO object is not writable.
# Examples
```jldoctest
Expand All @@ -122,10 +162,23 @@ false
julia> rm("myfile.txt")
```
"""
function iswritable end
function copy end
iswritable(io::IO) = isopen(io)

"""
eof(stream) -> Bool
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
function eof end

function copy end
function wait_readnb end
function wait_close end

"""
read(io::IO, T)
Expand Down Expand Up @@ -357,65 +410,37 @@ end
function pipe_reader end
function pipe_writer end

for f in (:flush, :shutdown, :iswritable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO)
end
write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte)
write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from))
unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt}
buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...)
flush(io::AbstractPipe) = flush(pipe_writer(io)::IO)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:read, :readavailable, :bytesavailable, :reseteof, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8
unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb)
read(io::AbstractPipe) = read(pipe_reader(io)::IO)
readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out)
readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:readavailable, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool

iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO)
isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO)
close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO))

"""
bytesavailable(io)
Return the number of bytes available for reading before a read from this stream or buffer will block.
# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");
julia> bytesavailable(io)
34
```
"""
bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO)
bytesavailable(io::DevNull) = 0

"""
eof(stream) -> Bool
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool
reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO)


# Exception-safe wrappers (io = open(); try f(io) finally close(io))

Expand Down Expand Up @@ -1119,11 +1144,6 @@ ismarked(io::IO) = io.mark >= 0
# Make sure all IO streams support flush, even if only as a no-op,
# to make it easier to write generic I/O code.

"""
flush(stream)
Commit all currently buffered writes to the given stream.
"""
flush(io::IO) = nothing

"""
Expand Down
6 changes: 6 additions & 0 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ end

eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)

function shutdown(io::GenericIOBuffer)
io.writable = false
# OR throw(_UVError("shutdown", UV_ENOTSOCK))
nothing
end

@noinline function close(io::GenericIOBuffer{T}) where T
io.readable = false
io.writable = false
Expand Down
1 change: 1 addition & 0 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ end
function uv_alloc_buf end
function uv_readcb end
function uv_writecb_task end
function uv_shutdowncb_task end
function uv_return_spawn end
function uv_asynccb end
function uv_timercb end
Expand Down
1 change: 1 addition & 0 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool)
@warn "Process error" exception=(ex, catch_backtrace())
finally
close(parent)
child_readable || shutdown(stdio)
end
end
catch ex
Expand Down
Loading

0 comments on commit 6a98d0b

Please sign in to comment.