Skip to content

Commit

Permalink
add stream shutdown and support half-duplex operation
Browse files Browse the repository at this point in the history
Fixes one of the issues mentioned in #24526
  • Loading branch information
vtjnash committed May 11, 2021
1 parent 9bceffd commit 200de0b
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 112 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ New library features
Standard library changes
------------------------

* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#TBD]).
* `count` and `findall` now accept an `AbstractChar` argument to search for a character in a string ([#38675]).
* `range` now supports the `range(start, stop)` and `range(start, stop, length)` methods ([#39228]).
* `range` now supports `start` as an optional keyword argument ([#38041]).
Expand Down
3 changes: 2 additions & 1 deletion base/coreio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ flush(::DevNull) = nothing
wait_readnb(::DevNull) = wait()
wait_close(::DevNull) = wait()
eof(::DevNull) = true
bytesavailable(io::DevNull) = 0

let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR}
global write, unsafe_write
write(io::CoreIO, x::UInt8) = Core.write(io, x)
unsafe_write(io::CoreIO, x::Ptr{UInt8}, nb::UInt) = Core.unsafe_write(io, x, nb)
end

isopen
stdin = devnull
stdout = Core.stdout
stderr = Core.stderr
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ export

# I/O and events
close,
shutdown,
countlines,
eachline,
readeach,
Expand Down
111 changes: 59 additions & 52 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,36 @@ function isopen end
Close an I/O stream. Performs a [`flush`](@ref) first.
"""
function close end

"""
shutdown(stream)
Shutdown an I/O stream. Performs a [`flush`](@ref) first. Notify the other end that no
more data can ever be written to the underlying file. This is not supported by all IO
types.
"""
function shutdown end

"""
flush(stream)
Commit all currently buffered writes to the given stream.
"""
function flush end
function wait_readnb end
function wait_close end

"""
bytesavailable(io)
Return the number of bytes available for reading before a read from this stream or buffer will block.
# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");
julia> bytesavailable(io)
34
```
"""
function bytesavailable end

"""
Expand All @@ -81,7 +108,7 @@ function readavailable end
"""
isreadable(io) -> Bool
Return `true` if the specified IO object is readable (if that can be determined).
Return `false` if the specified IO object is not readable.
# Examples
```jldoctest
Expand All @@ -99,12 +126,12 @@ true
julia> rm("myfile.txt")
```
"""
function isreadable end
isreadable(io::IO) = isopen(io)

"""
iswritable(io) -> Bool
Return `true` if the specified IO object is writable (if that can be determined).
Return `false` if the specified IO object is not writable.
# Examples
```jldoctest
Expand All @@ -122,10 +149,23 @@ false
julia> rm("myfile.txt")
```
"""
function iswritable end
function copy end
iswritable(io::IO) = isopen(io)

"""
eof(stream) -> Bool
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
function eof end

function copy end
function wait_readnb end
function wait_close end

"""
read(io::IO, T)
Expand Down Expand Up @@ -357,65 +397,37 @@ end
function pipe_reader end
function pipe_writer end

for f in (:flush, :shutdown, :iswritable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO)
end
write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte)
write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from))
unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt}
buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...)
flush(io::AbstractPipe) = flush(pipe_writer(io)::IO)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:read, :readavailable, :bytesavailable, :reseteof, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8
unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb)
read(io::AbstractPipe) = read(pipe_reader(io)::IO)
readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...)
readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out)
readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n)

for f in (
# peek/mark interface
:mark, :unmark, :reset, :ismarked,
# Simple reader functions
:readavailable, :isreadable)
@eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO)
end
peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool

iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO)
isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO)
close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO))

"""
bytesavailable(io)
Return the number of bytes available for reading before a read from this stream or buffer will block.
# Examples
```jldoctest
julia> io = IOBuffer("JuliaLang is a GitHub organization");
julia> bytesavailable(io)
34
```
"""
bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO)
bytesavailable(io::DevNull) = 0

"""
eof(stream) -> Bool
Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this
function will block to wait for more data if necessary, and then return `false`. Therefore
it is always safe to read one byte after seeing `eof` return `false`. `eof` will return
`false` as long as buffered data is still available, even if the remote end of a connection
is closed.
"""
eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool
reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO)


# Exception-safe wrappers (io = open(); try f(io) finally close(io))

Expand Down Expand Up @@ -1118,11 +1130,6 @@ ismarked(io::IO) = io.mark >= 0
# Make sure all IO streams support flush, even if only as a no-op,
# to make it easier to write generic I/O code.

"""
flush(stream)
Commit all currently buffered writes to the given stream.
"""
flush(io::IO) = nothing

"""
Expand Down
6 changes: 6 additions & 0 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ end

eof(io::GenericIOBuffer) = (io.ptr-1 == io.size)

function shutdown(io::GenericIOBuffer)
io.writable = false
# OR throw(_UVError("shutdown", UV_ENOTSOCK))
nothing
end

@noinline function close(io::GenericIOBuffer{T}) where T
io.readable = false
io.writable = false
Expand Down
1 change: 1 addition & 0 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ end
function uv_alloc_buf end
function uv_readcb end
function uv_writecb_task end
function uv_shutdowncb_task end
function uv_return_spawn end
function uv_asynccb end
function uv_timercb end
Expand Down
1 change: 1 addition & 0 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool)
@warn "Process error" exception=(ex, catch_backtrace())
finally
close(parent)
child_readable || shutdown(stdio)
end
end
catch ex
Expand Down
Loading

0 comments on commit 200de0b

Please sign in to comment.