diff --git a/NEWS.md b/NEWS.md index 6c104252a6633..82eabe62f81b2 100644 --- a/NEWS.md +++ b/NEWS.md @@ -91,6 +91,8 @@ Standard library changes * `current_project()` now searches the parent directories of a Git repository for a `Project.toml` file. This also affects the behavior of the `--project` command line option when using the default `--project=@.` ([#29108]). + * The `spawn` API is now more flexible and supports taking IOBuffer directly as a I/O stream, + converting to a system pipe as needed ([#30278]). #### Dates * New `DateTime(::Date, ::Time)` constructor ([#29754]). diff --git a/base/process.jl b/base/process.jl index a61046d6fdd63..1fb2c7304c151 100644 --- a/base/process.jl +++ b/base/process.jl @@ -135,9 +135,10 @@ const STDOUT_NO = 1 const STDERR_NO = 2 struct FileRedirect - filename::AbstractString + filename::String append::Bool - function FileRedirect(filename, append) + FileRedirect(filename::AbstractString, append::Bool) = FileRedirect(convert(String, filename), append) + function FileRedirect(filename::String, append::Bool) if lowercase(filename) == (@static Sys.iswindows() ? "nul" : "/dev/null") @warn "For portability use devnull instead of a file redirect" maxlog=1 end @@ -145,6 +146,8 @@ struct FileRedirect end end +# setup_stdio ≈ cconvert +# rawhandle ≈ unsafe_convert rawhandle(::DevNull) = C_NULL rawhandle(x::OS_HANDLE) = x if OS_HANDLE !== RawFD @@ -158,19 +161,24 @@ struct CmdRedirect <: AbstractCmd cmd::AbstractCmd handle::Redirectable stream_no::Int + readable::Bool end +CmdRedirect(cmd, handle, stream_no) = CmdRedirect(cmd, handle, stream_no, stream_no == STDIN_NO) function show(io::IO, cr::CmdRedirect) print(io, "pipeline(") show(io, cr.cmd) print(io, ", ") if cr.stream_no == STDOUT_NO - print(io, "stdout=") + print(io, "stdout") elseif cr.stream_no == STDERR_NO - print(io, "stderr=") + print(io, "stderr") elseif cr.stream_no == STDIN_NO - print(io, "stdin=") + print(io, "stdin") + else + print(io, cr.stream_no) end + print(io, cr.readable ? "<" : ">") show(io, cr.handle) print(io, ")") end @@ -294,7 +302,7 @@ run(pipeline(`ls`, "out.txt")) run(pipeline("out.txt", `grep xyz`)) ``` """ -pipeline(a, b, c, d...) = pipeline(pipeline(a,b), c, d...) +pipeline(a, b, c, d...) = pipeline(pipeline(a, b), c, d...) mutable struct Process <: AbstractPipe cmd::Cmd @@ -306,20 +314,8 @@ mutable struct Process <: AbstractPipe termsignal::Int32 exitnotify::Condition closenotify::Condition - function Process(cmd::Cmd, handle::Ptr{Cvoid}, - in::Union{Redirectable, Ptr{Cvoid}}, - out::Union{Redirectable, Ptr{Cvoid}}, - err::Union{Redirectable, Ptr{Cvoid}}) - if !isa(in, IO) - in = devnull - end - if !isa(out, IO) - out = devnull - end - if !isa(err, IO) - err = devnull - end - this = new(cmd, handle, in, out, err, + function Process(cmd::Cmd, handle::Ptr{Cvoid}) + this = new(cmd, handle, devnull, devnull, devnull, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), Condition(), Condition()) @@ -330,45 +326,21 @@ end pipe_reader(p::Process) = p.out pipe_writer(p::Process) = p.in -struct ProcessChain <: AbstractPipe +# Represents a whole pipeline of any number of related processes +# so the entire pipeline can be treated as one entity +mutable struct ProcessChain <: AbstractPipe processes::Vector{Process} - in::Redirectable - out::Redirectable - err::Redirectable - ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3]) + in::IO + out::IO + err::IO + function ProcessChain() + return new(Process[], devnull, devnull, devnull) + end end pipe_reader(p::ProcessChain) = p.out pipe_writer(p::ProcessChain) = p.in -function _jl_spawn(file, argv, cmd::Cmd, stdio) - loop = eventloop() - handles = Tuple{Cint, UInt}[ # assuming little-endian layout - let h = rawhandle(io) - h === C_NULL ? (0x00, UInt(0)) : - h isa OS_HANDLE ? (0x02, UInt(cconvert(@static(Sys.iswindows() ? Ptr{Cvoid} : Cint), h))) : - h isa Ptr{Cvoid} ? (0x04, UInt(h)) : - error("invalid spawn handle $h from $io") - end - for io in stdio] - proc = Libc.malloc(_sizeof_uv_process) - disassociate_julia_struct(proc) - error = ccall(:jl_spawn, Int32, - (Cstring, Ptr{Cstring}, Ptr{Cvoid}, Ptr{Cvoid}, - Ptr{Tuple{Cint, UInt}}, Int, - UInt32, Ptr{Cstring}, Cstring, Ptr{Cvoid}), - file, argv, loop, proc, - handles, length(handles), - cmd.flags, - cmd.env === nothing ? C_NULL : cmd.env, - isempty(cmd.dir) ? C_NULL : cmd.dir, - uv_jl_return_spawn::Ptr{Cvoid}) - if error != 0 - ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), proc) - throw(_UVError("could not spawn " * string(cmd), error)) - end - return proc -end - +# release ownership of the libuv handle function uvfinalize(proc::Process) if proc.handle != C_NULL disassociate_julia_struct(proc.handle) @@ -378,6 +350,7 @@ function uvfinalize(proc::Process) nothing end +# called when the process dies function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) data = ccall(:jl_uv_process_data, Ptr{Cvoid}, (Ptr{Cvoid},), p) data == C_NULL && return @@ -390,27 +363,93 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) nothing end +# called when the libuv handle is destroyed function _uv_hook_close(proc::Process) proc.handle = C_NULL notify(proc.closenotify) + nothing end -function _spawn(redirect::CmdRedirect, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - _spawn(redirect.cmd, - (redirect.stream_no == STDIN_NO ? redirect.handle : stdios[1], - redirect.stream_no == STDOUT_NO ? redirect.handle : stdios[2], - redirect.stream_no == STDERR_NO ? redirect.handle : stdios[3]), - chain=chain) +const SpawnIOs = Vector{Any} # convenience name for readability + +# handle marshalling of `Cmd` arguments from Julia to C +@noinline function _spawn_primitive(file, cmd::Cmd, stdio::SpawnIOs) + loop = eventloop() + iohandles = Tuple{Cint, UInt}[ # assuming little-endian layout + let h = rawhandle(io) + h === C_NULL ? (0x00, UInt(0)) : + h isa OS_HANDLE ? (0x02, UInt(cconvert(@static(Sys.iswindows() ? Ptr{Cvoid} : Cint), h))) : + h isa Ptr{Cvoid} ? (0x04, UInt(h)) : + error("invalid spawn handle $h from $io") + end + for io in stdio] + handle = Libc.malloc(_sizeof_uv_process) + disassociate_julia_struct(handle) # ensure that data field is set to C_NULL + error = ccall(:jl_spawn, Int32, + (Cstring, Ptr{Cstring}, Ptr{Cvoid}, Ptr{Cvoid}, + Ptr{Tuple{Cint, UInt}}, Int, + UInt32, Ptr{Cstring}, Cstring, Ptr{Cvoid}), + file, cmd.exec, loop, handle, + iohandles, length(iohandles), + cmd.flags, + cmd.env === nothing ? C_NULL : cmd.env, + isempty(cmd.dir) ? C_NULL : cmd.dir, + uv_jl_return_spawn::Ptr{Cvoid}) + if error != 0 + ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually + throw(_UVError("could not spawn " * repr(cmd), error)) + end + pp = Process(cmd, handle) + associate_julia_struct(handle, pp) + return pp +end + +_spawn(cmds::AbstractCmd) = _spawn(cmds, Any[]) + +# optimization: we can spawn `Cmd` directly without allocating the ProcessChain +function _spawn(cmd::Cmd, stdios::SpawnIOs) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + pp = setup_stdios(stdios) do stdios + return _spawn_primitive(cmd.exec[1], cmd, stdios) + end + return pp end -function _spawn(cmds::OrCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if chain === nothing - chain = ProcessChain(stdios) +# assume that having a ProcessChain means that the stdio are setup +function _spawn(cmds::AbstractCmd, stdios::SpawnIOs) + pp = setup_stdios(stdios) do stdios + return _spawn(cmds, stdios, ProcessChain()) end + return pp +end + +# helper function for making a copy of a SpawnIOs, with replacement +function _stdio_copy(stdios::SpawnIOs, fd::Int, @nospecialize replace) + nio = max(fd, length(stdios)) + new = SpawnIOs(undef, nio) + copyto!(fill!(new, devnull), stdios) + new[fd] = replace + return new +end + +function _spawn(redirect::CmdRedirect, stdios::SpawnIOs, args...) + fdnum = redirect.stream_no + 1 + io, close_io = setup_stdio(redirect.handle, redirect.readable) + try + stdios = _stdio_copy(stdios, fdnum, io) + return _spawn(redirect.cmd, stdios, args...) + finally + close_io && close_stdio(io) + end +end + +function _spawn(cmds::OrCmds, stdios::SpawnIOs, chain::ProcessChain) in_pipe, out_pipe = link_pipe(false, false) try - _spawn(cmds.a, (stdios[1], out_pipe, stdios[3]), chain=chain) - _spawn(cmds.b, (in_pipe, stdios[2], stdios[3]), chain=chain) + stdios_left = _stdio_copy(stdios, 2, out_pipe) + _spawn(cmds.a, stdios_left, chain) + stdios_right = _stdio_copy(stdios, 1, in_pipe) + _spawn(cmds.b, stdios_right, chain) finally close_pipe_sync(out_pipe) close_pipe_sync(in_pipe) @@ -418,14 +457,13 @@ function _spawn(cmds::OrCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothi return chain end -function _spawn(cmds::ErrOrCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if chain === nothing - chain = ProcessChain(stdios) - end +function _spawn(cmds::ErrOrCmds, stdios::SpawnIOs, chain::ProcessChain) in_pipe, out_pipe = link_pipe(false, false) try - _spawn(cmds.a, (stdios[1], stdios[2], out_pipe), chain=chain) - _spawn(cmds.b, (in_pipe, stdios[2], stdios[3]), chain=chain) + stdios_left = _stdio_copy(stdios, 3, out_pipe) + _spawn(cmds.a, stdios_left, chain) + stdios_right = _stdio_copy(stdios, 1, in_pipe) + _spawn(cmds.b, stdios_right, chain) finally close_pipe_sync(out_pipe) close_pipe_sync(in_pipe) @@ -433,12 +471,55 @@ function _spawn(cmds::ErrOrCmds, stdios::StdIOSet; chain::Union{ProcessChain, No return chain end +function _spawn(cmds::AndCmds, stdios::SpawnIOs, chain::ProcessChain) + _spawn(cmds.a, stdios, chain) + _spawn(cmds.b, stdios, chain) + return chain +end + +function _spawn(cmd::Cmd, stdios::SpawnIOs, chain::ProcessChain) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + pp = _spawn_primitive(cmd.exec[1], cmd, stdios) + push!(chain.processes, pp) + return chain +end + + +# open the child end of each element of `stdios`, and initialize the parent end +function setup_stdios(f, stdios::SpawnIOs) + nstdio = length(stdios) + open_io = Vector{Any}(undef, nstdio) + close_io = falses(nstdio) + try + for i in 1:nstdio + open_io[i], close_io[i] = setup_stdio(stdios[i], i == 1) + end + pp = f(open_io) + return pp + finally + for i in 1:nstdio + close_io[i] && close_stdio(open_io[i]) + end + end +end + function setup_stdio(stdio::PipeEndpoint, child_readable::Bool) if stdio.status == StatusInit + # if the PipeEndpoint isn't open, set it to the parent end + # and pass the other end to the child rd, wr = link_pipe(!child_readable, child_readable) - open_pipe!(stdio, child_readable ? wr : rd, !child_readable, child_readable) - return (child_readable ? rd : wr, true) + try + open_pipe!(stdio, child_readable ? wr : rd) + catch ex + close_pipe_sync(rd) + close_pipe_sync(wr) + rethrow(ex) + end + child = child_readable ? rd : wr + return (child, true) end + # if it's already open, assume that it's already the child end + # (since we can't do anything else) return (stdio, false) end @@ -471,92 +552,73 @@ function setup_stdio(stdio::FileRedirect, child_readable::Bool) return (io, true) end -function setup_stdio(io, child_readable::Bool) - # if there is no specialization, - # assume that rawhandle is defined for it - return (io, false) -end - -close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) -close_stdio(stdio::Nothing) = nothing -close_stdio(stdio) = close(stdio) - -function setup_stdio(anon::Function, stdio::StdIOSet) - in, close_in = setup_stdio(stdio[1], true) +# incrementally move data between an IOBuffer and a system Pipe +# TODO: probably more efficient (when valid) to use `stdio` directly as the +# PipeEndpoint buffer field in some cases +function setup_stdio(stdio::Union{IOBuffer, BufferStream}, child_readable::Bool) + parent = PipeEndpoint() + rd, wr = link_pipe(!child_readable, child_readable) try - out, close_out = setup_stdio(stdio[2], false) - try - err, close_err = setup_stdio(stdio[3], false) - try - anon((in, out, err)) + open_pipe!(parent, child_readable ? wr : rd) + catch ex + close_pipe_sync(rd) + close_pipe_sync(wr) + rethrow(ex) + end + child = child_readable ? rd : wr + try + let in = (child_readable ? parent : stdio), + out = (child_readable ? stdio : parent) + @async try + write(in, out) + catch ex + @warn "Process error" exception=(ex, catch_backtrace()) finally - close_err && close_stdio(err) + close(parent) end - finally - close_out && close_stdio(out) end - finally - close_in && close_stdio(in) + catch ex + close_pipe_sync(child) + rethrow(ex) end - nothing + return (child, true) end -function _spawn(cmd::Cmd, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if isempty(cmd.exec) - throw(ArgumentError("cannot spawn empty command")) - end - pp = Process(cmd, C_NULL, stdios[1], stdios[2], stdios[3]) - setup_stdio(stdios) do stdios - handle = _jl_spawn(cmd.exec[1], cmd.exec, cmd, stdios) - associate_julia_struct(handle, pp) - pp.handle = handle - end - if chain !== nothing - push!(chain.processes, pp) - end - return pp +function setup_stdio(io, child_readable::Bool) + # if there is no specialization, + # assume that rawhandle is defined for it + return (io, false) end -function _spawn(cmds::AndCmds, stdios::StdIOSet; chain::Union{ProcessChain, Nothing}=nothing) - if chain === nothing - chain = ProcessChain(stdios) - end - setup_stdio(stdios) do stdios - _spawn(cmds.a, stdios, chain=chain) - _spawn(cmds.b, stdios, chain=chain) - end - return chain -end +close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) +close_stdio(stdio) = close(stdio) # INTERNAL -# returns stdios: -# A set of up to 256 stdio instructions, where each entry can be either: -# | - An IO to be passed to the child -# | - devnull to pass /dev/null -# | - An Filesystem.File object to redirect the output to -# \ - A string specifying a filename to be opened - -spawn_opts_swallow(stdios::StdIOSet) = (stdios,) -spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull, args...) = - ((in, out, err), args...) -spawn_opts_inherit(stdios::StdIOSet) = (stdios,) +# pad out stdio to have at least three elements, +# passing either `devnull` or the corresponding `stdio` +# A Redirectable can be any of: +# - A system IO handle, to be passed to the child +# - An uninitialized pipe, to be created +# - devnull (to pass /dev/null for 0-2, or to leave undefined for fd > 2) +# - An Filesystem.File or IOStream object to redirect the output to +# - A FileRedirect, containing a string specifying a filename to be opened for the child + +spawn_opts_swallow(stdios::StdIOSet) = Any[stdios...] +spawn_opts_inherit(stdios::StdIOSet) = Any[stdios...] +spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) = + Any[in, out, err] # pass original descriptors to child processes by default, because we might # have already exhausted and closed the libuv object for our standard streams. -# this caused issue #8529. -spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2), args...) = - ((in, out, err), args...) - -_spawn(cmds::AbstractCmd, args...; chain::Union{ProcessChain, Nothing}=nothing) = - _spawn(cmds, spawn_opts_swallow(args...)...; chain=chain) +# ref issue #8529 +spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) = + Any[in, out, err] function eachline(cmd::AbstractCmd; keep::Bool=false) - _stdout = Pipe() - processes = _spawn(cmd, (devnull, _stdout, stderr)) - close(_stdout.in) - out = _stdout.out - # implicitly close after reading lines, since we opened - return EachLine(out, keep=keep, - ondone=()->(close(out); success(processes) || pipeline_error(processes)))::EachLine + out = PipeEndpoint() + processes = _spawn(cmd, Any[devnull, out, stderr]) + # if the user consumes all the data, also check process exit status for success + ondone = () -> (success(processes) || pipeline_error(processes); nothing) + return EachLine(out, keep=keep, ondone=ondone)::EachLine end function open(cmds::AbstractCmd, mode::AbstractString, other::Redirectable=devnull) @@ -573,34 +635,34 @@ end # return a Process object to read-to/write-from the pipeline """ - open(command, stdio=devnull; write::Bool = false, read::Bool = !write) + open(command, other=devnull; write::Bool = false, read::Bool = !write) -Start running `command` asynchronously, and return a `process` object. If `read` is -true, then `process` reads from the process's standard output and `stdio` optionally -specifies the process's standard input stream. If `write` is true, then `process` writes to -the process's standard input and `stdio` optionally specifies the process's standard output +Start running `command` asynchronously, and return a `process::IO` object. If `read` is +true, then reads from the process come from the process's standard output and `other` optionally +specifies the process's standard input stream. If `write` is true, then writes go to +the process's standard input and `other` optionally specifies the process's standard output stream. +The process's standard error stream is connected to the current global `stderr`. """ -function open(cmds::AbstractCmd, other::Redirectable=devnull; write::Bool = false, read::Bool = !write) +function open(cmds::AbstractCmd, other::Redirectable=devnull; write::Bool=false, read::Bool=!write) if read && write - other === devnull || throw(ArgumentError("no other stream can be specified in read-write mode")) - in = Pipe() - out = Pipe() - processes = _spawn(cmds, (in,out,stderr)) - close(in.out) - close(out.in) + other === devnull || throw(ArgumentError("no stream can be specified for `other` in read-write mode")) + in = PipeEndpoint() + out = PipeEndpoint() + processes = _spawn(cmds, Any[in, out, stderr]) + processes.in = in + processes.out = out elseif read - in = other - out = Pipe() - processes = _spawn(cmds, (in,out,stderr)) - close(out.in) + out = PipeEndpoint() + processes = _spawn(cmds, Any[other, out, stderr]) + processes.out = out elseif write - in = Pipe() - out = other - processes = _spawn(cmds, (in,out,stderr)) - close(in.out) + in = PipeEndpoint() + processes = _spawn(cmds, Any[in, other, stderr]) + processes.in = in else - processes = _spawn(cmds) + other === devnull || throw(ArgumentError("no stream can be specified for `other` in no-access mode")) + processes = _spawn(cmds, Any[devnull, devnull, stderr]) end return processes end @@ -660,10 +722,27 @@ Use [`pipeline`](@ref) to control I/O redirection. """ function run(cmds::AbstractCmd, args...; wait::Bool = true) if wait - ps = _spawn(cmds, spawn_opts_inherit(args...)...) + ps = _spawn(cmds, spawn_opts_inherit(args...)) success(ps) || pipeline_error(ps) else - ps = _spawn(cmds, spawn_opts_swallow(args...)...) + stdios = spawn_opts_swallow(args...) + ps = _spawn(cmds, stdios) + # for each stdio input argument, guess whether the user + # passed a `stdio` placeholder object as input, and thus + # might be able to use the return AbstractProcess as an IO object + # (this really only applies to PipeEndpoint, Pipe, TCPSocket, or an AbstractPipe wrapping one of those) + if length(stdios) > 0 + in = stdios[1] + isa(in, IO) && (ps.in = in) + if length(stdios) > 1 + out = stdios[2] + isa(out, IO) && (ps.out = out) + if length(stdios) > 2 + err = stdios[3] + isa(err, IO) && (ps.err = err) + end + end + end end return ps end @@ -681,7 +760,7 @@ function test_success(proc::Process) @assert process_exited(proc) if proc.exitcode < 0 #TODO: this codepath is not currently tested - throw(_UVError("could not start process $(string(proc.cmd))", proc.exitcode)) + throw(_UVError("could not start process " * repr(proc.cmd), proc.exitcode)) end return proc.exitcode == 0 && (proc.termsignal == 0 || proc.termsignal == SIGPIPE) end @@ -788,22 +867,18 @@ process_exited(s::ProcessChain) = process_exited(s.processes) process_signaled(s::Process) = (s.termsignal > 0) -#process_stopped (s::Process) = false #not supported by libuv. Do we need this? -#process_stop_signal(s::Process) = false #not supported by libuv. Do we need this? - function process_status(s::Process) - process_running(s) ? "ProcessRunning" : - process_signaled(s) ? "ProcessSignaled("*string(s.termsignal)*")" : - #process_stopped(s) ? "ProcessStopped("*string(process_stop_signal(s))*")" : - process_exited(s) ? "ProcessExited("*string(s.exitcode)*")" : - error("process status error") + return process_running(s) ? "ProcessRunning" : + process_signaled(s) ? "ProcessSignaled(" * string(s.termsignal) * ")" : + process_exited(s) ? "ProcessExited(" * string(s.exitcode) * ")" : + error("process status error") end ## implementation of `cmd` syntax ## -arg_gen() = String[] +arg_gen() = String[] arg_gen(x::AbstractString) = String[cstr(x)] -arg_gen(cmd::Cmd) = cmd.exec +arg_gen(cmd::Cmd) = cmd.exec function arg_gen(head) if isiterable(typeof(head)) diff --git a/base/stream.jl b/base/stream.jl index d46721c57b5f8..71fe818482026 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -148,6 +148,18 @@ function PipeEndpoint() return pipe end +function PipeEndpoint(fd::OS_HANDLE) + pipe = PipeEndpoint() + err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) + uv_error("pipe_open", err) + pipe.status = StatusOpen + return pipe +end +if OS_HANDLE != RawFD + PipeEndpoint(fd::RawFD) = PipeEndpoint(Libc._get_osfhandle(fd)) +end + + mutable struct TTY <: LibuvStream handle::Ptr{Cvoid} status::Int @@ -177,16 +189,17 @@ mutable struct TTY <: LibuvStream end end -function TTY(fd::RawFD; readable::Bool = false) +function TTY(fd::OS_HANDLE) tty = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit) - # This needs to go after associate_julia_struct so that there - # is no garbage in the ->data field - err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, RawFD, Int32), - eventloop(), tty.handle, fd, readable) + err = ccall(:uv_tty_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, OS_HANDLE, Int32), + eventloop(), tty.handle, fd, 0) uv_error("TTY", err) tty.status = StatusOpen return tty end +if OS_HANDLE != RawFD + TTY(fd::RawFD) = TTY(Libc._get_osfhandle(fd)) +end show(io::IO, stream::LibuvServer) = print(io, typeof(stream), "(", _fd(stream), " ", @@ -240,6 +253,64 @@ function init_stdio(handle::Ptr{Cvoid}) end end +""" + open(fd::OS_HANDLE) -> IO + +Take a raw file descriptor wrap it in a Julia-aware IO type, +and take ownership of the fd handle. +Call `open(Libc.dup(fd))` to avoid the ownership capture +of the original handle. + +!!! warn + Do not call this on a handle that's already owned by some + other part of the system. +""" +function open(h::OS_HANDLE) + t = ccall(:uv_guess_handle, Cint, (OS_HANDLE,), h) + if t == UV_FILE + @static if Sys.iswindows() + # TODO: Get ios.c to understand native handles + h = ccall(:_open_osfhandle, RawFD, (WindowsRawSocket, Int32), h, 0) + end + # TODO: Get fdio to work natively with file descriptors instead of integers + return fdio(cconvert(Cint, h)) + elseif t == UV_TTY + return TTY(h) + elseif t == UV_TCP + Sockets = require(PkgId(UUID((0x6462fe0b_24de_5631, 0x8697_dd941f90decc)), "Sockets")) + return Sockets.TCPSocket(h) + elseif t == UV_NAMED_PIPE + pipe = PipeEndpoint(h) + @static if Sys.iswindows() + if ccall(:jl_ispty, Cint, (Ptr{Cvoid},), pipe.handle) != 0 + # replace the Julia `PipeEndpoint` type with a `TTY` type, + # if we detect that this is a cygwin pty object + pipe_handle, pipe_status = pipe.handle, pipe.status + pipe.status = StatusClosed + pipe.handle = C_NULL + return TTY(pipe_handle, pipe_status) + end + end + return pipe + else + throw(ArgumentError("invalid stdio type: $t")) + end +end + +if OS_HANDLE != RawFD + function open(fd::RawFD) + h = Libc.dup(Libc._get_osfhandle(fd)) # make a dup to steal ownership away from msvcrt + try + io = open(h) + ccall(:_close, Cint, (RawFD,), fd) # on success, destroy the old libc handle + return io + catch ex + ccall(:CloseHandle, stdcall, Cint, (OS_HANDLE,), h) # on failure, destroy the new nt handle + rethrow(ex) + end + end +end + function isopen(x::Union{LibuvStream, LibuvServer}) if x.status == StatusUninit || x.status == StatusInit throw(ArgumentError("$x is not initialized")) @@ -575,12 +646,12 @@ show(io::IO, stream::Pipe) = print(io, ## Functions for PipeEndpoint and PipeServer ## -function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE, readable::Bool, writable::Bool) +function open_pipe!(p::PipeEndpoint, handle::OS_HANDLE) if p.status != StatusInit error("pipe is already in use or has been closed") end - err = ccall(:jl_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE, Cint, Cint), p.handle, handle, readable, writable) - uv_error("open_pipe", err) + err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle) + uv_error("pipe_open", err) p.status = StatusOpen return p end @@ -591,13 +662,13 @@ function link_pipe!(read_end::PipeEndpoint, reader_supports_async::Bool, rd, wr = link_pipe(reader_supports_async, writer_supports_async) try try - open_pipe!(read_end, rd, true, false) + open_pipe!(read_end, rd) catch close_pipe_sync(rd) rethrow() end read_end.status = StatusOpen - open_pipe!(write_end, wr, false, true) + open_pipe!(write_end, wr) catch close_pipe_sync(wr) rethrow() @@ -891,6 +962,7 @@ function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) end _fd(x::IOStream) = RawFD(fd(x)) +_fd(x::Union{OS_HANDLE, RawFD}) = x function _fd(x::Union{LibuvStream, LibuvServer}) fd = Ref{OS_HANDLE}(INVALID_OS_HANDLE) diff --git a/src/init.c b/src/init.c index cf3a420d76d98..c3e4ede715b56 100644 --- a/src/init.c +++ b/src/init.c @@ -309,11 +309,6 @@ void *jl_winsock_handle; uv_loop_t *jl_io_loop; -#ifndef _OS_WINDOWS_ -#define UV_STREAM_READABLE 0x20 /* The stream is readable */ -#define UV_STREAM_WRITABLE 0x40 /* The stream is writable */ -#endif - #ifdef _OS_WINDOWS_ int uv_dup(uv_os_fd_t fd, uv_os_fd_t* dupfd) { HANDLE current_process; @@ -368,7 +363,7 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable) switch(uv_guess_handle(fd)) { case UV_TTY: handle = malloc(sizeof(uv_tty_t)); - if ((err = uv_tty_init(jl_io_loop, (uv_tty_t*)handle, fd, readable))) { + if ((err = uv_tty_init(jl_io_loop, (uv_tty_t*)handle, fd, 0))) { jl_errorf("error initializing %s in uv_tty_init: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } ((uv_tty_t*)handle)->data = NULL; @@ -413,13 +408,6 @@ static void *init_stdio_handle(const char *stdio, uv_os_fd_t fd, int readable) if ((err = uv_pipe_open((uv_pipe_t*)handle, fd))) { jl_errorf("error initializing %s in uv_pipe_open: %s (%s %d)", stdio, uv_strerror(err), uv_err_name(err), err); } -#ifndef _OS_WINDOWS_ - // remove flags set erroneously by libuv: - if (readable) - ((uv_pipe_t*)handle)->flags &= ~UV_STREAM_WRITABLE; - else - ((uv_pipe_t*)handle)->flags &= ~UV_STREAM_READABLE; -#endif ((uv_pipe_t*)handle)->data = NULL; break; case UV_TCP: diff --git a/src/jl_uv.c b/src/jl_uv.c index a98108e0953ce..6eb01267bdf52 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -210,24 +210,6 @@ JL_DLLEXPORT int jl_process_events(uv_loop_t *loop) else return 0; } -#ifndef _OS_WINDOWS_ -#define UV_STREAM_READABLE 0x20 /* The stream is readable */ -#define UV_STREAM_WRITABLE 0x40 /* The stream is writable */ -#endif - -JL_DLLEXPORT int jl_pipe_open(uv_pipe_t *pipe, uv_os_fd_t fd, int readable, int writable) -{ - int err = uv_pipe_open(pipe, fd); -#ifndef _OS_WINDOWS_ - // clear flags set erroneously by libuv: - if (!readable) - pipe->flags &= ~UV_STREAM_READABLE; - if (!writable) - pipe->flags &= ~UV_STREAM_WRITABLE; -#endif - return err; -} - static void jl_proc_exit_cleanup(uv_process_t *process, int64_t exit_status, int term_signal) { uv_close((uv_handle_t*)process, (uv_close_cb)&free); diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 6b39ae0a427e2..2965966fdd1c0 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -29,7 +29,7 @@ import Base: isless, show, print, parse, bind, convert, isreadable, iswritable, using Base: LibuvStream, LibuvServer, PipeEndpoint, @handle_as, uv_error, associate_julia_struct, uvfinalize, notify_error, stream_wait, uv_req_data, uv_req_set_data, preserve_handle, unpreserve_handle, _UVError, IOError, eventloop, StatusUninit, StatusInit, StatusConnecting, StatusOpen, StatusClosing, StatusClosed, StatusActive, - uv_status_string, check_open, wait_connected, + uv_status_string, check_open, wait_connected, OS_HANDLE, RawFD, UV_EINVAL, UV_ENOMEM, UV_ENOBUFS, UV_EAGAIN, UV_ECONNABORTED, UV_EADDRINUSE, UV_EACCES, UV_EADDRNOTAVAIL, UV_EAI_ADDRFAMILY, UV_EAI_AGAIN, UV_EAI_BADFLAGS, UV_EAI_BADHINTS, UV_EAI_CANCELED, UV_EAI_FAIL, @@ -86,6 +86,18 @@ function TCPSocket(; delay=true) return tcp end +function TCPSocket(fd::OS_HANDLE) + tcp = TCPSocket() + err = ccall(:uv_tcp_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd) + uv_error("tcp_open", err) + tcp.status = StatusOpen + return tcp +end +if OS_HANDLE != RawFD + TCPSocket(fd::RawFD) = TCPSocket(Libc._get_osfhandle(fd)) +end + + mutable struct TCPServer <: LibuvServer handle::Ptr{Cvoid} status::Int diff --git a/test/spawn.jl b/test/spawn.jl index 201a957ced5da..2840f5ecea030 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -57,16 +57,10 @@ out = read(`$echocmd hello` & `$echocmd world`, String) # Test for SIGPIPE being treated as normal termination (throws an error if broken) Sys.isunix() && run(pipeline(yescmd, `head`, devnull)) -let a, p - a = Base.Condition() - t = @async begin - p = run(pipeline(yescmd,devnull), wait=false) - Base.notify(a,p) - @test !success(p) - end - p = wait(a) +let p = run(pipeline(yescmd, devnull), wait=false) + t = @async !success(p) kill(p) - wait(t) + @test fetch(t) end if valgrind_off @@ -619,6 +613,23 @@ open(`$catcmd`, "r+") do f wait(t) end +let text = "input-test-text" + b = PipeBuffer() + proc = open(Base.CmdRedirect(Base.CmdRedirect(```$(Base.julia_cmd()) -E ' + in14 = Base.open(RawFD(14)) + out15 = Base.open(RawFD(15)) + write(out15, in14)'```, + IOBuffer(text), 14, true), + b, 15, false), "r") + @test read(proc, String) == string(length(text), '\n') + @test success(proc) + @test String(take!(b)) == text +end +@test repr(Base.CmdRedirect(``, devnull, 0, false)) == "pipeline(``, stdin>Base.DevNull())" +@test repr(Base.CmdRedirect(``, devnull, 1, true)) == "pipeline(``, stdout