From aad52ec5533e2184fbcdbb6f0f608d81b11586c2 Mon Sep 17 00:00:00 2001 From: Alex Arslan Date: Fri, 6 Aug 2021 16:26:42 -0700 Subject: [PATCH] Revert "add stream shutdown and support half-duplex operation (#40783)" (#41808) This reverts commit 6c4f2165619449182693061e07654b399428e0cf. --- NEWS.md | 1 - base/coreio.jl | 1 - base/exports.jl | 1 - base/io.jl | 124 ++++++++----------- base/iobuffer.jl | 6 - base/libuv.jl | 1 - base/process.jl | 1 - base/stream.jl | 132 ++++++--------------- doc/src/base/io-network.md | 1 - stdlib/Distributed/src/process_messages.jl | 4 +- stdlib/Sockets/src/Sockets.jl | 15 ++- stdlib/Sockets/test/runtests.jl | 35 +----- test/iobuffer.jl | 21 ++-- test/spawn.jl | 8 -- 14 files changed, 111 insertions(+), 240 deletions(-) diff --git a/NEWS.md b/NEWS.md index d64511283107dd..84941b377daeaf 100644 --- a/NEWS.md +++ b/NEWS.md @@ -45,7 +45,6 @@ Standard library changes overflow in most cases. The new function `checked_length` is now available, which will try to use checked arithmetic to error if the result may be wrapping. Or use a package such as SaferIntegers.jl when constructing the range. ([#40382]) -* TCP socket objects now expose `shutdown` functionality and support half-open mode usage ([#40783]). #### InteractiveUtils * A new macro `@time_imports` for reporting any time spent importing packages and their dependencies ([#41612]) diff --git a/base/coreio.jl b/base/coreio.jl index d0f8df290b41b7..9ef717383dedd5 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -13,7 +13,6 @@ write(::DevNull, ::UInt8) = 1 unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n close(::DevNull) = nothing wait_close(::DevNull) = wait() -bytesavailable(io::DevNull) = 0 let CoreIO = Union{Core.CoreSTDOUT, Core.CoreSTDERR} global write(io::CoreIO, x::UInt8) = Core.write(io, x) diff --git a/base/exports.jl b/base/exports.jl index da0019ab469877..0a8e473055b4b2 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -803,7 +803,6 @@ export # I/O and events close, - shutdown, countlines, eachline, readeach, diff --git a/base/io.jl b/base/io.jl index 2eface1ca94581..30a87aa9e1cf33 100644 --- a/base/io.jl +++ b/base/io.jl @@ -60,49 +60,9 @@ function isopen end Close an I/O stream. Performs a [`flush`](@ref) first. """ function close end - -""" - shutdown(stream) - -Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref) -first. Notify the other end that no more data will be written to the underlying -file. This is not supported by all IO types. - -# Examples -```jldoctest -julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task - -julia> write(io, "request"); - -julia> # calling `read(io)` here would block forever - -julia> shutdown(io); - -julia> read(io, String) -"request" -""" -function shutdown end - -""" - flush(stream) - -Commit all currently buffered writes to the given stream. -""" function flush end - -""" - bytesavailable(io) - -Return the number of bytes available for reading before a read from this stream or buffer will block. - -# Examples -```jldoctest -julia> io = IOBuffer("JuliaLang is a GitHub organization"); - -julia> bytesavailable(io) -34 -``` -""" +function wait_readnb end +function wait_close end function bytesavailable end """ @@ -121,7 +81,7 @@ function readavailable end """ isreadable(io) -> Bool -Return `false` if the specified IO object is not readable. +Return `true` if the specified IO object is readable (if that can be determined). # Examples ```jldoctest @@ -139,12 +99,12 @@ true julia> rm("myfile.txt") ``` """ -isreadable(io::IO) = isopen(io) +function isreadable end """ iswritable(io) -> Bool -Return `false` if the specified IO object is not writable. +Return `true` if the specified IO object is writable (if that can be determined). # Examples ```jldoctest @@ -162,22 +122,9 @@ false julia> rm("myfile.txt") ``` """ -iswritable(io::IO) = isopen(io) - -""" - eof(stream) -> Bool - -Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this -function will block to wait for more data if necessary, and then return `false`. Therefore -it is always safe to read one byte after seeing `eof` return `false`. `eof` will return -`false` as long as buffered data is still available, even if the remote end of a connection -is closed. -""" -function eof end - +function iswritable end function copy end -function wait_readnb end -function wait_close end +function eof end """ read(io::IO, T) @@ -410,37 +357,65 @@ end function pipe_reader end function pipe_writer end -for f in (:flush, :shutdown, :iswritable) - @eval $(f)(io::AbstractPipe) = $(f)(pipe_writer(io)::IO) -end write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io)::IO, byte) write(to::IO, from::AbstractPipe) = write(to, pipe_reader(from)) unsafe_write(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_write(pipe_writer(io)::IO, p, nb)::Union{Int,UInt} buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io)::IO, args...) +flush(io::AbstractPipe) = flush(pipe_writer(io)::IO) -for f in ( - # peek/mark interface - :mark, :unmark, :reset, :ismarked, - # Simple reader functions - :read, :readavailable, :bytesavailable, :reseteof, :isreadable) - @eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO) -end read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io)::IO, byte)::UInt8 unsafe_read(io::AbstractPipe, p::Ptr{UInt8}, nb::UInt) = unsafe_read(pipe_reader(io)::IO, p, nb) +read(io::AbstractPipe) = read(pipe_reader(io)::IO) readuntil(io::AbstractPipe, arg::UInt8; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil(io::AbstractPipe, arg::AbstractChar; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil(io::AbstractPipe, arg::AbstractString; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil(io::AbstractPipe, arg::AbstractVector; kw...) = readuntil(pipe_reader(io)::IO, arg; kw...) readuntil_vector!(io::AbstractPipe, target::AbstractVector, keep::Bool, out) = readuntil_vector!(pipe_reader(io)::IO, target, keep, out) readbytes!(io::AbstractPipe, target::AbstractVector{UInt8}, n=length(target)) = readbytes!(pipe_reader(io)::IO, target, n) + +for f in ( + # peek/mark interface + :mark, :unmark, :reset, :ismarked, + # Simple reader functions + :readavailable, :isreadable) + @eval $(f)(io::AbstractPipe) = $(f)(pipe_reader(io)::IO) +end peek(io::AbstractPipe, ::Type{T}) where {T} = peek(pipe_reader(io)::IO, T)::T -wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb) -eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool +iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)::IO) isopen(io::AbstractPipe) = isopen(pipe_writer(io)::IO) || isopen(pipe_reader(io)::IO) close(io::AbstractPipe) = (close(pipe_writer(io)::IO); close(pipe_reader(io)::IO)) +wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io)::IO, nb) wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)::IO); wait_close(pipe_reader(io)::IO)) +""" + bytesavailable(io) + +Return the number of bytes available for reading before a read from this stream or buffer will block. + +# Examples +```jldoctest +julia> io = IOBuffer("JuliaLang is a GitHub organization"); + +julia> bytesavailable(io) +34 +``` +""" +bytesavailable(io::AbstractPipe) = bytesavailable(pipe_reader(io)::IO) +bytesavailable(io::DevNull) = 0 + +""" + eof(stream) -> Bool + +Test whether an I/O stream is at end-of-file. If the stream is not yet exhausted, this +function will block to wait for more data if necessary, and then return `false`. Therefore +it is always safe to read one byte after seeing `eof` return `false`. `eof` will return +`false` as long as buffered data is still available, even if the remote end of a connection +is closed. +""" +eof(io::AbstractPipe) = eof(pipe_reader(io)::IO)::Bool +reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)::IO) + # Exception-safe wrappers (io = open(); try f(io) finally close(io)) @@ -1144,6 +1119,11 @@ ismarked(io::IO) = io.mark >= 0 # Make sure all IO streams support flush, even if only as a no-op, # to make it easier to write generic I/O code. +""" + flush(stream) + +Commit all currently buffered writes to the given stream. +""" flush(io::IO) = nothing """ diff --git a/base/iobuffer.jl b/base/iobuffer.jl index f65ed3f894fe00..e204eca906cbf0 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -334,12 +334,6 @@ end eof(io::GenericIOBuffer) = (io.ptr-1 == io.size) -function shutdown(io::GenericIOBuffer) - io.writable = false - # OR throw(_UVError("shutdown", UV_ENOTSOCK)) - nothing -end - @noinline function close(io::GenericIOBuffer{T}) where T io.readable = false io.writable = false diff --git a/base/libuv.jl b/base/libuv.jl index c64cbff564b66d..c63045f4b1b68c 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -107,7 +107,6 @@ end function uv_alloc_buf end function uv_readcb end function uv_writecb_task end -function uv_shutdowncb_task end function uv_return_spawn end function uv_asynccb end function uv_timercb end diff --git a/base/process.jl b/base/process.jl index 6be7099f94f358..b3ec79fa1ab4ed 100644 --- a/base/process.jl +++ b/base/process.jl @@ -275,7 +275,6 @@ function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool) @warn "Process error" exception=(ex, catch_backtrace()) finally close(parent) - child_readable || shutdown(stdio) end end catch ex diff --git a/base/stream.jl b/base/stream.jl index 6e433b771f0d2f..6cbd1d3b86a280 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -109,7 +109,7 @@ function eof(s::LibuvStream) # and that we won't return true if there's a readerror pending (it'll instead get thrown). # This requires some careful ordering here (TODO: atomic loads) bytesavailable(s) > 0 && return false - open = isreadable(s) # must precede readerror check + open = isopen(s) # must precede readerror check s.readerror === nothing || throw(s.readerror) return !open end @@ -270,7 +270,6 @@ show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(", function isreadable(io::LibuvStream) bytesavailable(io) > 0 && return true isopen(io) || return false - io.status == StatusEOF && return false return ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), io.handle) != 0 end @@ -379,7 +378,7 @@ function isopen(x::Union{LibuvStream, LibuvServer}) if x.status == StatusUninit || x.status == StatusInit throw(ArgumentError("$x is not initialized")) end - return x.status != StatusClosed + return x.status != StatusClosed && x.status != StatusEOF end function check_open(x::Union{LibuvStream, LibuvServer}) @@ -391,13 +390,13 @@ end function wait_readnb(x::LibuvStream, nb::Int) # fast path before iolock acquire bytesavailable(x.buffer) >= nb && return - open = isopen(x) && x.status != StatusEOF # must precede readerror check + open = isopen(x) # must precede readerror check x.readerror === nothing || throw(x.readerror) open || return iolock_begin() # repeat fast path after iolock acquire, before other expensive work bytesavailable(x.buffer) >= nb && (iolock_end(); return) - open = isopen(x) && x.status != StatusEOF + open = isopen(x) x.readerror === nothing || throw(x.readerror) open || (iolock_end(); return) # now do the "real" work @@ -408,7 +407,6 @@ function wait_readnb(x::LibuvStream, nb::Int) while bytesavailable(x.buffer) < nb x.readerror === nothing || throw(x.readerror) isopen(x) || break - x.status != StatusEOF || break x.throttle = max(nb, x.throttle) start_reading(x) # ensure we are reading iolock_end() @@ -433,50 +431,6 @@ function wait_readnb(x::LibuvStream, nb::Int) nothing end -function shutdown(s::LibuvStream) - iolock_begin() - check_open(s) - req = Libc.malloc(_sizeof_uv_shutdown) - uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call - err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), - req, s, @cfunction(uv_shutdowncb_task, Cvoid, (Ptr{Cvoid}, Cint))) - if err < 0 - Libc.free(req) - uv_error("shutdown", err) - end - ct = current_task() - preserve_handle(ct) - sigatomic_begin() - uv_req_set_data(req, ct) - iolock_end() - status = try - sigatomic_end() - wait()::Cint - finally - # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end - sigatomic_end() - iolock_begin() - ct.queue === nothing || list_deletefirst!(ct.queue, ct) - if uv_req_data(req) != C_NULL - # req is still alive, - # so make sure we won't get spurious notifications later - uv_req_set_data(req, C_NULL) - else - # done with req - Libc.free(req) - end - iolock_end() - unpreserve_handle(ct) - if isopen(s) && (s.status == StatusEOF && !isa(s, TTY)) || ccall(:uv_is_readable, Cint, (Ptr{Cvoid},), s.handle) == 0 - close(s) - end - end - if status < 0 - throw(_UVError("shutdown", status)) - end - nothing -end - function wait_close(x::Union{LibuvStream, LibuvServer}) preserve_handle(x) lock(x.cond) @@ -497,7 +451,7 @@ function close(stream::Union{LibuvStream, LibuvServer}) if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing - elseif isopen(stream) + elseif isopen(stream) || stream.status == StatusEOF should_wait = uv_handle_data(stream) != C_NULL if stream.status != StatusClosing ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) @@ -652,33 +606,35 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf) function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt) lock(stream.cond) - if nread < 0 - if nread == UV_ENOBUFS && nrequested == 0 - # remind the client that stream.buffer is full - notify(stream.cond) - elseif nread == UV_EOF # libuv called uv_stop_reading already - if stream.status != StatusClosing - if stream isa TTY || ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0 - # stream can still be used either by reseteof or write - stream.status = StatusEOF + try + if nread < 0 + if nread == UV_ENOBUFS && nrequested == 0 + # remind the client that stream.buffer is full + notify(stream.cond) + elseif nread == UV_EOF + if isa(stream, TTY) + stream.status = StatusEOF # libuv called uv_stop_reading already notify(stream.cond) - else - # underlying stream is no longer useful: begin finalization + elseif stream.status != StatusClosing + # begin shutdown of the stream ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end + else + stream.readerror = _UVError("read", nread) + # This is a fatal connection error. Shutdown requests as per the usual + # close function won't work and libuv will fail with an assertion failure + ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream) + stream.status = StatusClosing + notify(stream.cond) end else - stream.readerror = _UVError("read", nread) - # This is a fatal connection error - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - stream.status = StatusClosing + notify_filled(stream.buffer, nread) + notify(stream.cond) end - else - notify_filled(stream.buffer, nread) - notify(stream.cond) + finally + unlock(stream.cond) end - unlock(stream.cond) # Stop background reading when # 1) there's nobody paying attention to the data we are reading @@ -695,7 +651,6 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) nothing end readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested)) - nothing end function reseteof(x::TTY) @@ -889,7 +844,6 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int) while bytesavailable(buf) < nb s.readerror === nothing || throw(s.readerror) isopen(s) || break - s.status != StatusEOF || break iolock_end() wait_readnb(s, nb) iolock_begin() @@ -936,7 +890,6 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt) while bytesavailable(buf) < nb s.readerror === nothing || throw(s.readerror) isopen(s) || throw(EOFError()) - s.status != StatusEOF || throw(EOFError()) iolock_end() wait_readnb(s, nb) iolock_begin() @@ -993,14 +946,13 @@ function readuntil(x::LibuvStream, c::UInt8; keep::Bool=false) @assert buf.seekable == false if !occursin(c, buf) # fast path checks first x.readerror === nothing || throw(x.readerror) - if isopen(x) && x.status != StatusEOF + if isopen(x) preserve_handle(x) lock(x.cond) try while !occursin(c, x.buffer) x.readerror === nothing || throw(x.readerror) isopen(x) || break - x.status != StatusEOF || break start_reading(x) # ensure we are reading iolock_end() wait(x.cond) @@ -1163,20 +1115,6 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) nothing end -function uv_shutdowncb_task(req::Ptr{Cvoid}, status::Cint) - d = uv_req_data(req) - if d != C_NULL - uv_req_set_data(req, C_NULL) # let the Task know we got the shutdowncb - t = unsafe_pointer_to_objref(d)::Task - schedule(t, status) - else - # no owner for this req, safe to just free it - Libc.free(req) - end - nothing -end - - _fd(x::IOStream) = RawFD(fd(x)) _fd(x::Union{OS_HANDLE, RawFD}) = x @@ -1467,20 +1405,18 @@ mutable struct BufferStream <: LibuvStream buffer::IOBuffer cond::Threads.Condition readerror::Any + is_open::Bool buffer_writes::Bool lock::ReentrantLock # advisory lock - status::Int - BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, false, ReentrantLock(), StatusActive) + BufferStream() = new(PipeBuffer(), Threads.Condition(), nothing, true, false, ReentrantLock()) end -isopen(s::BufferStream) = s.status != StatusClosed - -shutdown(s::BufferStream) = close(s) +isopen(s::BufferStream) = s.is_open function close(s::BufferStream) lock(s.cond) do - s.status = StatusClosed + s.is_open = false notify(s.cond) nothing end @@ -1503,8 +1439,8 @@ function unsafe_read(s::BufferStream, a::Ptr{UInt8}, nb::UInt) end bytesavailable(s::BufferStream) = bytesavailable(s.buffer) -isreadable(s::BufferStream) = (isopen(s) || bytesavailable(s) > 0) && s.buffer.readable -iswritable(s::BufferStream) = isopen(s) && s.buffer.writable +isreadable(s::BufferStream) = s.buffer.readable +iswritable(s::BufferStream) = s.buffer.writable function wait_readnb(s::BufferStream, nb::Int) lock(s.cond) do @@ -1514,7 +1450,7 @@ function wait_readnb(s::BufferStream, nb::Int) end end -show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")") +show(io::IO, s::BufferStream) = print(io, "BufferStream() bytes waiting:", bytesavailable(s.buffer), ", isopen:", s.is_open) function readuntil(s::BufferStream, c::UInt8; keep::Bool=false) bytes = lock(s.cond) do diff --git a/doc/src/base/io-network.md b/doc/src/base/io-network.md index 8b2ea3c4c2412f..2d6a462400813d 100644 --- a/doc/src/base/io-network.md +++ b/doc/src/base/io-network.md @@ -13,7 +13,6 @@ Base.take!(::Base.GenericIOBuffer) Base.fdio Base.flush Base.close -Base.shutdown Base.write Base.read Base.read! diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 732b972858dc97..8d5dac5af571e9 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -230,8 +230,8 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) deregister_worker(wpid) end - close(r_stream) - close(w_stream) + isopen(r_stream) && close(r_stream) + isopen(w_stream) && close(w_stream) if (myid() == 1) && (wpid > 1) if oldstate != W_TERMINATING diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index fb46b9255e6f03..6952aa9bd8a0fd 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -139,6 +139,9 @@ function TCPServer(; delay=true) return tcp end +isreadable(io::TCPSocket) = isopen(io) || bytesavailable(io) > 0 +iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing + """ accept(server[, client]) @@ -575,11 +578,11 @@ Enables or disables Nagle's algorithm on a given TCP server or socket. """ function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool) # disable or enable Nagle's algorithm on all OSes - iolock_begin() - check_open(sock) + Sockets.iolock_begin() + Sockets.check_open(sock) err = ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) # TODO: check err - iolock_end() + Sockets.iolock_end() return err end @@ -589,15 +592,15 @@ end On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`. """ function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool) - iolock_begin() - check_open(sock) + Sockets.iolock_begin() + Sockets.check_open(sock) @static if Sys.islinux() # tcp_quickack is a linux only option if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0 @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1 end end - iolock_end() + Sockets.iolock_end() nothing end diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 328ea929f4b4ba..5ac88b7216bef3 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -551,42 +551,17 @@ end r = @async close(s) @test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s) fetch(r) - close(srv) end end @testset "iswritable" begin let addr = Sockets.InetAddr(ip"127.0.0.1", 4445) srv = listen(addr) - let s = Sockets.TCPSocket() - Sockets.connect!(s, addr) - @test iswritable(s) broken=Sys.iswindows() - close(s) - @test !iswritable(s) - end - let s = Sockets.connect(addr) - @test iswritable(s) - shutdown(s) - @test !iswritable(s) - close(s) - end - close(srv) - srv = listen(addr) - let s = Sockets.connect(addr) - let c = accept(srv) - Base.errormonitor(@async try; write(c, c); finally; close(c); end) - end - @test iswritable(s) - write(s, "hello world\n") - shutdown(s) - @test !iswritable(s) - @test isreadable(s) - @test read(s, String) == "hello world\n" - @test !isreadable(s) - @test !isopen(s) - close(s) - end - close(srv) + s = Sockets.TCPSocket() + Sockets.connect!(s, addr) + @test iswritable(s) + close(s) + @test !iswritable(s) end end diff --git a/test/iobuffer.jl b/test/iobuffer.jl index 5514b6dc03f7fa..80972a7c654484 100644 --- a/test/iobuffer.jl +++ b/test/iobuffer.jl @@ -9,7 +9,7 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size) @testset "Read/write empty IOBuffer" begin io = IOBuffer() @test eof(io) - @test_throws EOFError read(io, UInt8) + @test_throws EOFError read(io,UInt8) @test write(io,"abc") === 3 @test isreadable(io) @test iswritable(io) @@ -18,7 +18,7 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size) @test position(io) == 3 @test eof(io) seek(io, 0) - @test read(io, UInt8) == convert(UInt8, 'a') + @test read(io,UInt8) == convert(UInt8, 'a') a = Vector{UInt8}(undef, 2) @test read!(io, a) == a @test a == UInt8['b','c'] @@ -34,24 +34,22 @@ bufcontents(io::Base.GenericIOBuffer) = unsafe_string(pointer(io.data), io.size) truncate(io, 10) @test position(io) == 0 @test all(io.data .== 0) - @test write(io, Int16[1, 2, 3, 4, 5, 6]) === 12 + @test write(io,Int16[1,2,3,4,5,6]) === 12 seek(io, 2) truncate(io, 10) @test ioslength(io) == 10 io.readable = false - @test_throws ArgumentError read!(io, UInt8[0]) + @test_throws ArgumentError read!(io,UInt8[0]) truncate(io, 0) @test write(io,"boston\ncambridge\n") > 0 @test String(take!(io)) == "boston\ncambridge\n" @test String(take!(io)) == "" @test write(io, ComplexF64(0)) === 16 @test write(io, Rational{Int64}(1//2)) === 16 - @test shutdown(io) === nothing - @test_throws ArgumentError write(io, UInt8[0]) + close(io) + @test_throws ArgumentError write(io,UInt8[0]) + @test_throws ArgumentError seek(io,0) @test eof(io) - @test close(io) === nothing - @test_throws ArgumentError write(io, UInt8[0]) - @test_throws ArgumentError seek(io, 0) end @testset "Read/write readonly IOBuffer" begin @@ -239,7 +237,7 @@ end @test isreadable(bstream) @test iswritable(bstream) @test bytesavailable(bstream) == 0 - @test sprint(show, bstream) == "BufferStream(bytes waiting=$(bytesavailable(bstream.buffer)), isopen=true)" + @test sprint(show, bstream) == "BufferStream() bytes waiting:$(bytesavailable(bstream.buffer)), isopen:true" a = rand(UInt8,10) write(bstream,a) @test !eof(bstream) @@ -253,10 +251,9 @@ end @test !eof(bstream) read!(bstream,c) @test c == a[3:10] - @test shutdown(bstream) === nothing + @test close(bstream) === nothing @test eof(bstream) @test bytesavailable(bstream) == 0 - @test close(bstream) === nothing flag = Ref{Bool}(false) event = Base.Event() bstream = Base.BufferStream() diff --git a/test/spawn.jl b/test/spawn.jl index 9ec7c6842bedbf..95915a5ad804b2 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -765,15 +765,7 @@ let text = "input-test-text" @test read(proc, String) == string(length(text), '\n') @test success(proc) @test String(take!(b)) == text - - out = Base.BufferStream() - proc = run(catcmd, IOBuffer(text), out, wait=false) - @test proc.out === out - @test read(out, String) == text - @test success(proc) end - - @test repr(Base.CmdRedirect(``, devnull, 0, false)) == "pipeline(``, stdin>Base.DevNull())" @test repr(Base.CmdRedirect(``, devnull, 1, true)) == "pipeline(``, stdout