Skip to content

Commit

Permalink
spawn: permit using IOBuffer at stdout
Browse files Browse the repository at this point in the history
People expect to use this (the docs even almost even suggested it at
some point), so it is better to make it work as expected (and better
than they can emulate) than to criticize their choices.

Also fix a few regressions and handling mistakes in setup_stdios:
 - #44500 tried to store a Redirectable into a SpawnIO, dropping FileRedirect
 - CmdRedirect did not allocate a ProcessChain, so it wouldd call
   setup_stdio then call setup_stdios on the result of that, which is
   strongly discouraged as setup_stdio(s) should only be called once
 - BufferStream was missing `check_open` calls before writing, and
   ignored `Base.reseteof` as a possible means of resuming writing after
   `closewrite` sends a shutdown message.

Fixes #39311
Fixes #49234
Closes #49233
Closes #46768
  • Loading branch information
vtjnash committed Dec 8, 2023
1 parent 9723de5 commit 28bbf92
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 51 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ New library features
write the output to a stream rather than returning a string ([#48625]).
* `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]).
* New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]).
* Passing an IOBuffer as a stdout argument for Process spawn now works as
expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is
no longer required there for correctness to avoid data-races ([#TBD]).

Standard library changes
------------------------
Expand Down
10 changes: 10 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,16 @@ public
@locals,
@propagate_inbounds,

# IO
# types
BufferStream,
IOServer,
OS_HANDLE,
PipeEndpoint,
TTY,
# functions
reseteof,

# misc
notnothing,
runtests,
Expand Down
8 changes: 8 additions & 0 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ end

lock(::IO) = nothing
unlock(::IO) = nothing

"""
reseteof(io)
Clear the EOF flag from IO so that further reads (and possibly writes) are
again allowed. Note that it may immediately get re-set, if the underlying
stream object is at EOF and cannot be resumed.
"""
reseteof(x::IO) = nothing

const SZ_UNBUFFERED_IO = 65536
Expand Down
121 changes: 74 additions & 47 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe
in::IO
out::IO
err::IO
syncd::Vector{Task}
exitcode::Int64
termsignal::Int32
exitnotify::ThreadSynchronizer
function Process(cmd::Cmd, handle::Ptr{Cvoid})
this = new(cmd, handle, devnull, devnull, devnull,
function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task})
this = new(cmd, handle, devnull, devnull, devnull, syncd,
typemin(fieldtype(Process, :exitcode)),
typemin(fieldtype(Process, :termsignal)),
ThreadSynchronizer())
Expand All @@ -35,6 +36,15 @@ end
pipe_reader(p::ProcessChain) = p.out
pipe_writer(p::ProcessChain) = p.in

# a lightweight pair of a child OS_HANDLE and associated Task that will
# complete only after all content has been read from it for synchronizing
# state without the kernel to aide
struct SyncCloseFD
fd
t::Task
end
rawhandle(io::SyncCloseFD) = rawhandle(io.fd)

# release ownership of the libuv handle
function uvfinalize(proc::Process)
if proc.handle != C_NULL
Expand Down Expand Up @@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process)
nothing
end

const SpawnIO = Union{IO, RawFD, OS_HANDLE}
const SpawnIOs = Vector{SpawnIO} # convenience name for readability
const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD
const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable})

function as_cpumask(cpus::Vector{UInt16})
n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ())))
Expand All @@ -100,6 +110,7 @@ end
error("invalid spawn handle $h from $io")
end
for io in stdio]
syncd = Task[io.t for io in stdio if io isa SyncCloseFD]
handle = Libc.malloc(_sizeof_uv_process)
disassociate_julia_struct(handle)
(; exec, flags, env, dir) = cmd
Expand All @@ -117,7 +128,7 @@ end
cpumask === nothing ? 0 : length(cpumask),
@cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32)))
if err == 0
pp = Process(cmd, handle)
pp = Process(cmd, handle, syncd)
associate_julia_struct(handle, pp)
else
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually
Expand All @@ -130,23 +141,24 @@ end
return pp
end

_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[])
_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs())

# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
function _spawn(cmd::Cmd, stdios::SpawnIOs)
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable})
pp = setup_stdios(stdios) do stdios
return _spawn_primitive(cmd.exec[1], cmd, stdios)
return _spawn(cmd, stdios)
end
return pp
end

# optimization: we can spawn `Cmd` directly without allocating the ProcessChain
function _spawn(cmd::Cmd, stdios::SpawnIOs)
isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command"))
return _spawn_primitive(cmd.exec[1], cmd, stdios)
end

# assume that having a ProcessChain means that the stdio are setup
function _spawn(cmds::AbstractCmd, stdios::SpawnIOs)
pp = setup_stdios(stdios) do stdios
return _spawn(cmds, stdios, ProcessChain())
end
return pp
return _spawn(cmds, stdios, ProcessChain())
end

# helper function for making a copy of a SpawnIOs, with replacement
Expand Down Expand Up @@ -212,7 +224,7 @@ end


# open the child end of each element of `stdios`, and initialize the parent end
function setup_stdios(f, stdios::SpawnIOs)
function setup_stdios(f, stdios::Vector{Redirectable})
nstdio = length(stdios)
open_io = SpawnIOs(undef, nstdio)
close_io = falses(nstdio)
Expand Down Expand Up @@ -295,25 +307,29 @@ function setup_stdio(stdio::IO, child_readable::Bool)
child = child_readable ? rd : wr
try
let in = (child_readable ? parent : stdio),
out = (child_readable ? stdio : parent)
@async try
out = (child_readable ? stdio : parent),
t = @async try
write(in, out)
catch ex
@warn "Process I/O error" exception=(ex, catch_backtrace())
rethrow()
finally
close(parent)
child_readable || closewrite(stdio)
if !child_readable && applicable(closewrite, stdio)
closewrite(stdio)
end
end
return (SyncCloseFD(child, t), true)
end
catch
close_pipe_sync(child)
rethrow()
end
return (child, true)
end

close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
close_stdio(stdio) = close(stdio)
close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio)
close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd)

# INTERNAL
# pad out stdio to have at least three elements,
Expand All @@ -325,19 +341,19 @@ close_stdio(stdio) = close(stdio)
# - An Filesystem.File or IOStream object to redirect the output to
# - A FileRedirect, containing a string specifying a filename to be opened for the child

spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...]
spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...]
spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...]
spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...]
spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) =
SpawnIO[in, out, err]
Redirectable[in, out, err]
# pass original descriptors to child processes by default, because we might
# have already exhausted and closed the libuv object for our standard streams.
# ref issue #8529
spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) =
SpawnIO[in, out, err]
Redirectable[in, out, err]

function eachline(cmd::AbstractCmd; keep::Bool=false)
out = PipeEndpoint()
processes = _spawn(cmd, SpawnIO[devnull, out, stderr])
processes = _spawn(cmd, Redirectable[devnull, out, stderr])
# if the user consumes all the data, also check process exit status for success
ondone = () -> (success(processes) || pipeline_error(processes); nothing)
return EachLine(out, keep=keep, ondone=ondone)::EachLine
Expand Down Expand Up @@ -385,20 +401,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false,
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode"))
in = PipeEndpoint()
out = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[in, out, stderr])
processes = _spawn(cmds, Redirectable[in, out, stderr])
processes.in = in
processes.out = out
elseif read
out = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[stdio, out, stderr])
processes = _spawn(cmds, Redirectable[stdio, out, stderr])
processes.out = out
elseif write
in = PipeEndpoint()
processes = _spawn(cmds, SpawnIO[in, stdio, stderr])
processes = _spawn(cmds, Redirectable[in, stdio, stderr])
processes.in = in
else
stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode"))
processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr])
processes = _spawn(cmds, Redirectable[devnull, devnull, stderr])
end
return processes
end
Expand All @@ -415,12 +431,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...)
P = open(cmds, args...; kwargs...)
function waitkill(P::Union{Process,ProcessChain})
close(P)
# 0.1 seconds after we hope it dies (from closing stdio),
# we kill the process with SIGTERM (15)
local t = Timer(0.1) do t
# shortly after we hope it starts cleanup and dies (from closing
# stdio), we kill the process with SIGTERM (15) so that we can proceed
# with throwing the error and hope it will exit soon from that
local t = Timer(2) do t
process_running(P) && kill(P)
end
wait(P)
# pass false to indicate that we do not care about data-races on the
# Julia stdio objects after this point, since we already know this is
# an error path and the state of them is fairly unpredictable anyways
# in that case. Since we closed P some of those should come crumbling
# down already, and we don't want to throw that error here either.
wait(P, false)
close(t)
end
ret = try
Expand Down Expand Up @@ -650,26 +672,31 @@ function process_status(s::Process)
error("process status error")
end

function wait(x::Process)
process_exited(x) && return
iolock_begin()
function wait(x::Process, syncd::Bool=true)
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
iolock_begin()
if !process_exited(x)
preserve_handle(x)
lock(x.exitnotify)
iolock_end()
try
wait(x.exitnotify)
finally
unlock(x.exitnotify)
unpreserve_handle(x)
end
else
iolock_end()
end
else
iolock_end()
end
# and make sure all sync'd Tasks are complete too
syncd && for t in x.syncd
wait(t)
end
nothing
end

wait(x::ProcessChain) = foreach(wait, x.processes)
wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes)

show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")")

Expand Down
12 changes: 11 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ closewrite(s::BufferStream) = close(s)
function close(s::BufferStream)
lock(s.cond) do
s.status = StatusClosed
notify(s.cond)
notify(s.cond) # aka flush
nothing
end
end
Expand Down Expand Up @@ -1549,6 +1549,7 @@ stop_reading(s::BufferStream) = nothing
write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b))
function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt)
nwrite = lock(s.cond) do
check_open(s)
rv = unsafe_write(s.buffer, p, nb)
s.buffer_writes || notify(s.cond)
rv
Expand All @@ -1569,9 +1570,18 @@ end
buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s)
function flush(s::BufferStream)
lock(s.cond) do
check_open(s)
notify(s.cond)
nothing
end
end

skip(s::BufferStream, n) = skip(s.buffer, n)

function reseteof(x::BufferStream)
lock(s.cond) do
s.status = StatusOpen
nothing
end
nothing
end
12 changes: 9 additions & 3 deletions src/support/ios.c
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,12 @@ int ios_eof(ios_t *s)
{
if (s->state == bst_rd && s->bpos < s->size)
return 0;
if (s->_eof)
return 1;
if (s->bm == bm_mem)
return (s->_eof ? 1 : 0);
return 0;
if (s->fd == -1)
return 1;
if (s->_eof)
return 1;
return 0;
/*
if (_fd_available(s->fd))
Expand All @@ -617,6 +617,12 @@ int ios_eof(ios_t *s)
*/
}

void ios_reseteof(ios_t *s)
{
if (s->bm != bm_mem && s->fd != -1)
s->_eof = 0;
}

int ios_eof_blocking(ios_t *s)
{
if (s->state == bst_rd && s->bpos < s->size)
Expand Down

0 comments on commit 28bbf92

Please sign in to comment.