diff --git a/NEWS.md b/NEWS.md index 0dd4d2f53bc01c..283ed186e3bcdb 100644 --- a/NEWS.md +++ b/NEWS.md @@ -72,6 +72,9 @@ New library features write the output to a stream rather than returning a string ([#48625]). * `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]). * New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]). +* Passing an IOBuffer as a stdout argument for Process spawn now works as + expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is + no longer required there for correctness to avoid data-races ([#TBD]). Standard library changes ------------------------ diff --git a/base/exports.jl b/base/exports.jl index 398d828f9cf199..4ec35d1d2b3abd 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1155,6 +1155,16 @@ public @locals, @propagate_inbounds, +# IO + # types + BufferStream, + IOServer, + OS_HANDLE, + PipeEndpoint, + TTY, + # functions + reseteof, + # misc notnothing, runtests, diff --git a/base/io.jl b/base/io.jl index a2e68882bbc863..e1fd282f07976e 100644 --- a/base/io.jl +++ b/base/io.jl @@ -25,6 +25,14 @@ end lock(::IO) = nothing unlock(::IO) = nothing + +""" + reseteof(io) + +Clear the EOF flag from IO so that further reads (and possibly writes) are +again allowed. Note that it may immediately get re-set, if the underlying +stream object is at EOF and cannot be resumed. +""" reseteof(x::IO) = nothing const SZ_UNBUFFERED_IO = 65536 diff --git a/base/process.jl b/base/process.jl index ed51a30ae3cedd..86f042c8e740bc 100644 --- a/base/process.jl +++ b/base/process.jl @@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe in::IO out::IO err::IO + syncd::Vector{Task} exitcode::Int64 termsignal::Int32 exitnotify::ThreadSynchronizer - function Process(cmd::Cmd, handle::Ptr{Cvoid}) - this = new(cmd, handle, devnull, devnull, devnull, + function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task}) + this = new(cmd, handle, devnull, devnull, devnull, syncd, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), ThreadSynchronizer()) @@ -35,6 +36,15 @@ end pipe_reader(p::ProcessChain) = p.out pipe_writer(p::ProcessChain) = p.in +# a lightweight pair of a child OS_HANDLE and associated Task that will +# complete only after all content has been read from it for synchronizing +# state without the kernel to aide +struct SyncCloseFD + fd + t::Task +end +rawhandle(io::SyncCloseFD) = rawhandle(io.fd) + # release ownership of the libuv handle function uvfinalize(proc::Process) if proc.handle != C_NULL @@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process) nothing end -const SpawnIO = Union{IO, RawFD, OS_HANDLE} -const SpawnIOs = Vector{SpawnIO} # convenience name for readability +const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD +const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable}) function as_cpumask(cpus::Vector{UInt16}) n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ()))) @@ -100,6 +110,7 @@ end error("invalid spawn handle $h from $io") end for io in stdio] + syncd = Task[io.t for io in stdio if io isa SyncCloseFD] handle = Libc.malloc(_sizeof_uv_process) disassociate_julia_struct(handle) (; exec, flags, env, dir) = cmd @@ -117,7 +128,7 @@ end cpumask === nothing ? 0 : length(cpumask), @cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32))) if err == 0 - pp = Process(cmd, handle) + pp = Process(cmd, handle, syncd) associate_julia_struct(handle, pp) else ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually @@ -130,23 +141,24 @@ end return pp end -_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[]) +_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs()) -# optimization: we can spawn `Cmd` directly without allocating the ProcessChain -function _spawn(cmd::Cmd, stdios::SpawnIOs) - isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) +function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable}) pp = setup_stdios(stdios) do stdios - return _spawn_primitive(cmd.exec[1], cmd, stdios) + return _spawn(cmd, stdios) end return pp end +# optimization: we can spawn `Cmd` directly without allocating the ProcessChain +function _spawn(cmd::Cmd, stdios::SpawnIOs) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + return _spawn_primitive(cmd.exec[1], cmd, stdios) +end + # assume that having a ProcessChain means that the stdio are setup function _spawn(cmds::AbstractCmd, stdios::SpawnIOs) - pp = setup_stdios(stdios) do stdios - return _spawn(cmds, stdios, ProcessChain()) - end - return pp + return _spawn(cmds, stdios, ProcessChain()) end # helper function for making a copy of a SpawnIOs, with replacement @@ -212,7 +224,7 @@ end # open the child end of each element of `stdios`, and initialize the parent end -function setup_stdios(f, stdios::SpawnIOs) +function setup_stdios(f, stdios::Vector{Redirectable}) nstdio = length(stdios) open_io = SpawnIOs(undef, nstdio) close_io = falses(nstdio) @@ -295,25 +307,29 @@ function setup_stdio(stdio::IO, child_readable::Bool) child = child_readable ? rd : wr try let in = (child_readable ? parent : stdio), - out = (child_readable ? stdio : parent) - @async try + out = (child_readable ? stdio : parent), + t = @async try write(in, out) catch ex @warn "Process I/O error" exception=(ex, catch_backtrace()) + rethrow() finally close(parent) - child_readable || closewrite(stdio) + if !child_readable && applicable(closewrite, stdio) + closewrite(stdio) + end end + return (SyncCloseFD(child, t), true) end catch close_pipe_sync(child) rethrow() end - return (child, true) end -close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) close_stdio(stdio) = close(stdio) +close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) +close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd) # INTERNAL # pad out stdio to have at least three elements, @@ -325,19 +341,19 @@ close_stdio(stdio) = close(stdio) # - An Filesystem.File or IOStream object to redirect the output to # - A FileRedirect, containing a string specifying a filename to be opened for the child -spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...] -spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...] +spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...] +spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...] spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) = - SpawnIO[in, out, err] + Redirectable[in, out, err] # pass original descriptors to child processes by default, because we might # have already exhausted and closed the libuv object for our standard streams. # ref issue #8529 spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) = - SpawnIO[in, out, err] + Redirectable[in, out, err] function eachline(cmd::AbstractCmd; keep::Bool=false) out = PipeEndpoint() - processes = _spawn(cmd, SpawnIO[devnull, out, stderr]) + processes = _spawn(cmd, Redirectable[devnull, out, stderr]) # if the user consumes all the data, also check process exit status for success ondone = () -> (success(processes) || pipeline_error(processes); nothing) return EachLine(out, keep=keep, ondone=ondone)::EachLine @@ -385,20 +401,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false, stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode")) in = PipeEndpoint() out = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[in, out, stderr]) + processes = _spawn(cmds, Redirectable[in, out, stderr]) processes.in = in processes.out = out elseif read out = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[stdio, out, stderr]) + processes = _spawn(cmds, Redirectable[stdio, out, stderr]) processes.out = out elseif write in = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[in, stdio, stderr]) + processes = _spawn(cmds, Redirectable[in, stdio, stderr]) processes.in = in else stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode")) - processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr]) + processes = _spawn(cmds, Redirectable[devnull, devnull, stderr]) end return processes end @@ -415,12 +431,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...) P = open(cmds, args...; kwargs...) function waitkill(P::Union{Process,ProcessChain}) close(P) - # 0.1 seconds after we hope it dies (from closing stdio), - # we kill the process with SIGTERM (15) - local t = Timer(0.1) do t + # shortly after we hope it starts cleanup and dies (from closing + # stdio), we kill the process with SIGTERM (15) so that we can proceed + # with throwing the error and hope it will exit soon from that + local t = Timer(2) do t process_running(P) && kill(P) end - wait(P) + # pass false to indicate that we do not care about data-races on the + # Julia stdio objects after this point, since we already know this is + # an error path and the state of them is fairly unpredictable anyways + # in that case. Since we closed P some of those should come crumbling + # down already, and we don't want to throw that error here either. + wait(P, false) close(t) end ret = try @@ -650,26 +672,31 @@ function process_status(s::Process) error("process status error") end -function wait(x::Process) - process_exited(x) && return - iolock_begin() +function wait(x::Process, syncd::Bool=true) if !process_exited(x) - preserve_handle(x) - lock(x.exitnotify) - iolock_end() - try - wait(x.exitnotify) - finally - unlock(x.exitnotify) - unpreserve_handle(x) + iolock_begin() + if !process_exited(x) + preserve_handle(x) + lock(x.exitnotify) + iolock_end() + try + wait(x.exitnotify) + finally + unlock(x.exitnotify) + unpreserve_handle(x) + end + else + iolock_end() end - else - iolock_end() + end + # and make sure all sync'd Tasks are complete too + syncd && for t in x.syncd + wait(t) end nothing end -wait(x::ProcessChain) = foreach(wait, x.processes) +wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes) show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")") diff --git a/base/stream.jl b/base/stream.jl index 22af8d59359f38..b79c699391a830 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -1489,7 +1489,7 @@ closewrite(s::BufferStream) = close(s) function close(s::BufferStream) lock(s.cond) do s.status = StatusClosed - notify(s.cond) + notify(s.cond) # aka flush nothing end end @@ -1549,6 +1549,7 @@ stop_reading(s::BufferStream) = nothing write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b)) function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt) nwrite = lock(s.cond) do + check_open(s) rv = unsafe_write(s.buffer, p, nb) s.buffer_writes || notify(s.cond) rv @@ -1569,9 +1570,18 @@ end buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s) function flush(s::BufferStream) lock(s.cond) do + check_open(s) notify(s.cond) nothing end end skip(s::BufferStream, n) = skip(s.buffer, n) + +function reseteof(x::BufferStream) + lock(s.cond) do + s.status = StatusOpen + nothing + end + nothing +end diff --git a/src/support/ios.c b/src/support/ios.c index 2c20dcb45e4c42..7f70112c82cc0c 100644 --- a/src/support/ios.c +++ b/src/support/ios.c @@ -602,12 +602,12 @@ int ios_eof(ios_t *s) { if (s->state == bst_rd && s->bpos < s->size) return 0; + if (s->_eof) + return 1; if (s->bm == bm_mem) - return (s->_eof ? 1 : 0); + return 0; if (s->fd == -1) return 1; - if (s->_eof) - return 1; return 0; /* if (_fd_available(s->fd)) @@ -617,6 +617,12 @@ int ios_eof(ios_t *s) */ } +void ios_reseteof(ios_t *s) +{ + if (s->bm != bm_mem && s->fd != -1) + s->_eof = 0; +} + int ios_eof_blocking(ios_t *s) { if (s->state == bst_rd && s->bpos < s->size)