From 2fdc182b79f906b7370e19c266f2119f1f58eb14 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 4 Aug 2016 17:52:35 -0400 Subject: [PATCH] make Libuv types RAII with better error handling and bump libuv version for bugfixes --- base/process.jl | 9 +- base/socket.jl | 115 +++++++-------- base/stream.jl | 131 ++++++++---------- .../md5 | 1 + .../sha512 | 1 + .../md5 | 1 - .../sha512 | 1 - deps/libuv.version | 2 +- test/spawn.jl | 17 +++ 9 files changed, 137 insertions(+), 141 deletions(-) create mode 100644 deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/md5 create mode 100644 deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/sha512 delete mode 100644 deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/md5 delete mode 100644 deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/sha512 diff --git a/base/process.jl b/base/process.jl index 88d51cb4f1634..22c39e4527f64 100644 --- a/base/process.jl +++ b/base/process.jl @@ -403,22 +403,19 @@ end function setup_stdio(stdio::PipeEndpoint, readable::Bool) closeafter = false - if stdio.handle == C_NULL - io = Libc.malloc(_sizeof_uv_named_pipe) + if stdio.status == StatusUninit if readable link_pipe(io, false, stdio, true) else link_pipe(stdio, true, io, false) end closeafter = true - else - io = stdio.handle end - return (io, closeafter) + return (stdio.handle, closeafter) end function setup_stdio(stdio::Pipe, readable::Bool) - if stdio.in.handle == C_NULL && stdio.out.handle == C_NULL + if stdio.in.status == StatusUninit && stdio.out.status == StatusUninit link_pipe(stdio) end io = readable ? stdio.out : stdio.in diff --git a/base/socket.jl b/base/socket.jl index 8365f219fe711..d037173ea62b5 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -255,33 +255,30 @@ type TCPSocket <: LibuvStream lock::ReentrantLock throttle::Int - TCPSocket(handle) = new( - handle, - StatusUninit, - true, - PipeBuffer(), - false, Condition(), - false, Condition(), - false, Condition(), - nothing, - ReentrantLock(), - DEFAULT_READ_BUFFER_SZ - ) + function TCPSocket(handle::Ptr{Void}, status) + tcp = new( + handle, + status, + true, + PipeBuffer(), + false, Condition(), + false, Condition(), + false, Condition(), + nothing, + ReentrantLock(), + DEFAULT_READ_BUFFER_SZ) + associate_julia_struct(tcp.handle, tcp) + finalizer(tcp, uvfinalize) + return tcp + end end function TCPSocket() - this = TCPSocket(Libc.malloc(_sizeof_uv_tcp)) - associate_julia_struct(this.handle,this) - finalizer(this,uvfinalize) - err = ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}), - eventloop(),this.handle) - if err != 0 - #TODO: this codepath is not currently tested - Libc.free(this.handle) - this.handle = C_NULL - throw(UVError("failed to create tcp socket",err)) - end - this.status = StatusInit - return this + tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit) + err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}), + eventloop(), tcp.handle) + uv_error("failed to create tcp socket", err) + tcp.status = StatusInit + return tcp end type TCPServer <: LibuvServer @@ -292,27 +289,24 @@ type TCPServer <: LibuvServer closecb::Callback closenotify::Condition - TCPServer(handle) = new( - handle, - StatusUninit, - false, Condition(), - false, Condition() - ) + function TCPServer(handle::Ptr{Void}, status) + tcp = new( + handle, + status, + false, Condition(), + false, Condition()) + associate_julia_struct(tcp.handle, tcp) + finalizer(tcp, uvfinalize) + return tcp + end end function TCPServer() - this = TCPServer(Libc.malloc(_sizeof_uv_tcp)) - associate_julia_struct(this.handle, this) - finalizer(this,uvfinalize) - err = ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}), - eventloop(),this.handle) - if err != 0 - #TODO: this codepath is not currently tested - Libc.free(this.handle) - this.handle = C_NULL - throw(UVError("failed to create tcp server",err)) - end - this.status = StatusInit - return this + tcp = TCPServer(Libc.malloc(_sizeof_uv_tcp), StatusUninit) + err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}), + eventloop(), tcp.handle) + uv_error("failed to create tcp server", err) + tcp.status = StatusInit + return tcp end isreadable(io::TCPSocket) = isopen(io) || nb_available(io) > 0 @@ -344,26 +338,23 @@ type UDPSocket <: LibuvStream sendnotify::Condition closenotify::Condition - UDPSocket(handle::Ptr) = new( - handle, - StatusUninit, - Condition(), - Condition(), - Condition() - ) + function UDPSocket(handle::Ptr{Void}, status) + udp = new( + handle, + status, + Condition(), + Condition(), + Condition()) + associate_julia_struct(udp.handle, udp) + finalizer(udp, uvfinalize) + return udp + end end function UDPSocket() - this = UDPSocket(Libc.malloc(_sizeof_uv_udp)) - associate_julia_struct(this.handle, this) - err = ccall(:uv_udp_init,Cint,(Ptr{Void},Ptr{Void}), - eventloop(),this.handle) - finalizer(this, uvfinalize) - if err != 0 - #TODO: this codepath is not currently tested - Libc.free(this.handle) - this.handle = C_NULL - throw(UVError("failed to create udp socket",err)) - end + this = UDPSocket(Libc.malloc(_sizeof_uv_udp), StatusUninit) + err = ccall(:uv_udp_init, Cint, (Ptr{Void}, Ptr{Void}), + eventloop(), this.handle) + uv_error("failed to create udp socket", err) this.status = StatusInit return this end diff --git a/base/stream.jl b/base/stream.jl index 733bba8d450ce..2c7ad60244ccd 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -113,16 +113,21 @@ type PipeEndpoint <: LibuvStream lock::ReentrantLock throttle::Int - PipeEndpoint(handle::Ptr{Void} = C_NULL) = new( - handle, - StatusUninit, - PipeBuffer(), - true, - false,Condition(), - false,Condition(), - false,Condition(), - nothing, ReentrantLock(), - DEFAULT_READ_BUFFER_SZ) + PipeEndpoint() = PipeEndpoint(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit) + function PipeEndpoint(handle::Ptr{Void}, status) + p = new(handle, + status, + PipeBuffer(), + true, + false,Condition(), + false,Condition(), + false,Condition(), + nothing, ReentrantLock(), + DEFAULT_READ_BUFFER_SZ) + associate_julia_struct(handle, p) + finalizer(p, uvfinalize) + return p + end end type PipeServer <: LibuvServer @@ -132,26 +137,22 @@ type PipeServer <: LibuvServer connectnotify::Condition closecb::Callback closenotify::Condition - PipeServer(handle) = new( - handle, - StatusUninit, - false,Condition(), - false,Condition()) + function PipeServer(handle::Ptr{Void}, status) + p = new(handle, + status, + false,Condition(), + false,Condition()) + associate_julia_struct(p.handle, p) + finalizer(p, uvfinalize) + return p + end end typealias LibuvPipe Union{PipeEndpoint, PipeServer} function PipeServer() - handle = Libc.malloc(_sizeof_uv_named_pipe) - try - ret = PipeServer(handle) - associate_julia_struct(ret.handle,ret) - finalizer(ret,uvfinalize) - return init_pipe!(ret;readable=true) - catch - Libc.free(handle) - rethrow() - end + p = PipeServer(Libc.malloc(_sizeof_uv_named_pipe), StatusUninit) + return init_pipe!(p; readable=true) end type TTY <: LibuvStream @@ -167,16 +168,19 @@ type TTY <: LibuvStream lock::ReentrantLock throttle::Int @static if is_windows(); ispty::Bool; end - function TTY(handle) + TTY() = TTY(Libc.malloc(_sizeof_uv_tty), StatusUninit) + function TTY(handle::Ptr{Void}, status) tty = new( handle, - StatusUninit, + status, true, PipeBuffer(), false,Condition(), false,Condition(), nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) + associate_julia_struct(handle, tty) + finalizer(tty, uvfinalize) @static if is_windows() tty.ispty = ccall(:jl_ispty, Cint, (Ptr{Void},), handle) != 0 end @@ -185,16 +189,15 @@ type TTY <: LibuvStream end function TTY(fd::RawFD; readable::Bool = false) - handle = Libc.malloc(_sizeof_uv_tty) - ret = TTY(handle) - associate_julia_struct(handle,ret) - finalizer(ret,uvfinalize) + tty = TTY() # This needs to go after associate_julia_struct so that there # is no garbage in the ->data field - uv_error("TTY",ccall(:uv_tty_init,Int32,(Ptr{Void},Ptr{Void},Int32,Int32),eventloop(),handle,fd.fd,readable)) - ret.status = StatusOpen - ret.line_buffered = false - return ret + err = ccall(:uv_tty_init, Int32, (Ptr{Void}, Ptr{Void}, Int32, Int32), + eventloop(), tty.handle, fd.fd, readable) + uv_error("TTY", err) + tty.status = StatusOpen + tty.line_buffered = false + return tty end show(io::IO,stream::LibuvServer) = print(io, typeof(stream), "(", uv_status_string(stream), ")") @@ -230,18 +233,15 @@ function init_stdio(handle::Ptr{Void}) # return File(RawFD(ccall(:jl_uv_file_handle,Int32,(Ptr{Void},),handle))) else if t == UV_TTY - ret = TTY(handle) + ret = TTY(handle, StatusOpen) elseif t == UV_TCP - ret = TCPSocket(handle) + ret = TCPSocket(handle, StatusOpen) elseif t == UV_NAMED_PIPE - ret = PipeEndpoint(handle) + ret = PipeEndpoint(handle, StatusOpen) else throw(ArgumentError("invalid stdio type: $t")) end - ret.status = StatusOpen ret.line_buffered = false - associate_julia_struct(ret.handle, ret) - finalizer(ret, uvfinalize) return ret end end @@ -322,7 +322,9 @@ function wait_close(x::Union{LibuvStream, LibuvServer}) end function close(stream::Union{LibuvStream, LibuvServer}) - if isopen(stream) + if stream.status == StatusInit + ccall(:jl_forceclose_uv, Void, (Ptr{Void},), stream.handle) + elseif isopen(stream) if stream.status != StatusClosing ccall(:jl_close_uv, Void, (Ptr{Void},), stream.handle) stream.status = StatusClosing @@ -337,11 +339,13 @@ end function uvfinalize(uv::Union{LibuvStream, LibuvServer}) if uv.handle != C_NULL disassociate_julia_struct(uv.handle) # not going to call the usual close hooks - if uv.status != StatusUninit && uv.status != StatusInit + if uv.status != StatusUninit close(uv) - uv.handle = C_NULL - uv.status = StatusClosed + else + Libc.free(uv.handle) end + uv.status = StatusClosed + uv.handle = C_NULL end nothing end @@ -571,24 +575,23 @@ function init_pipe!(pipe::LibuvPipe; if pipe.status != StatusUninit error("pipe is already initialized") end - if pipe.handle == C_NULL - malloc_julia_pipe!(pipe) - end - uv_error("init_pipe",ccall(:jl_init_pipe, Cint, + err = ccall(:jl_init_pipe, Cint, (Ptr{Void}, Int32, Int32, Int32), - pipe.handle, writable, readable, julia_only)) + pipe.handle, writable, readable, julia_only) + uv_error( + if readable && writable + "init_pipe(ipc)" + elseif readable + "init_pipe(read)" + elseif writable + "init_pipe(write)" + else + "init_pipe(none)" + end, err) pipe.status = StatusInit return pipe end -function malloc_julia_pipe!(x::LibuvPipe) - assert(x.handle == C_NULL) - x.handle = Libc.malloc(_sizeof_uv_named_pipe) - associate_julia_struct(x.handle, x) - finalizer(x, uvfinalize) - nothing -end - function _link_pipe(read_end::Ptr{Void}, write_end::Ptr{Void}) uv_error("pipe_link", ccall(:uv_pipe_link, Int32, (Ptr{Void}, Ptr{Void}), read_end, write_end)) @@ -620,9 +623,6 @@ end function link_pipe(read_end::PipeEndpoint, readable_julia_only::Bool, write_end::Ptr{Void}, writable_julia_only::Bool) - if read_end.handle == C_NULL - malloc_julia_pipe!(read_end) - end init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) uv_error("init_pipe", @@ -634,9 +634,6 @@ end function link_pipe(read_end::Ptr{Void}, readable_julia_only::Bool, write_end::PipeEndpoint, writable_julia_only::Bool) - if write_end.handle == C_NULL - malloc_julia_pipe!(write_end) - end uv_error("init_pipe", ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), read_end, 0, 1, readable_julia_only)) init_pipe!(write_end; @@ -648,12 +645,6 @@ end function link_pipe(read_end::PipeEndpoint, readable_julia_only::Bool, write_end::PipeEndpoint, writable_julia_only::Bool) - if write_end.handle == C_NULL - malloc_julia_pipe!(write_end) - end - if read_end.handle == C_NULL - malloc_julia_pipe!(read_end) - end init_pipe!(read_end; readable = true, writable = false, julia_only = readable_julia_only) init_pipe!(write_end; diff --git a/deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/md5 b/deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/md5 new file mode 100644 index 0000000000000..e42a20ffd5a35 --- /dev/null +++ b/deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/md5 @@ -0,0 +1 @@ +cc07a8ef026fa42eeaddf7b6e074096e diff --git a/deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/sha512 b/deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/sha512 new file mode 100644 index 0000000000000..bec2d8d44203f --- /dev/null +++ b/deps/checksums/libuv-28743d6091531340cfe316de2b2d385fe1778ff5.tar.gz/sha512 @@ -0,0 +1 @@ +e128cec9548ff2f52a78743203d3f06dceef3cf7cbeb3a7a5f7453b132576833f82688eed52cf74c035f57828deac079cd845ad47c8cd6197a8e0bebf3586fc2 diff --git a/deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/md5 b/deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/md5 deleted file mode 100644 index 793f153e26212..0000000000000 --- a/deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/md5 +++ /dev/null @@ -1 +0,0 @@ -c6a019d79d20eabc39619a04961c9a3b diff --git a/deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/sha512 b/deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/sha512 deleted file mode 100644 index f3f1ec9dca483..0000000000000 --- a/deps/checksums/libuv-cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68.tar.gz/sha512 +++ /dev/null @@ -1 +0,0 @@ -478ab473244b01bef344892a75e09fef50da8fb1a7212e0257c53f3223de4fde5f6bd449eef34bc1f025481c7d9f854002acb6eb203b447a50a34bae4ad9dee4 diff --git a/deps/libuv.version b/deps/libuv.version index 51f79989c5fc3..478ebca66589a 100644 --- a/deps/libuv.version +++ b/deps/libuv.version @@ -1,2 +1,2 @@ LIBUV_BRANCH=julia-uv1.9.0 -LIBUV_SHA1=cb6d0f875a5b8ca30cba45c0c1ef7442c87c1e68 +LIBUV_SHA1=28743d6091531340cfe316de2b2d385fe1778ff5 diff --git a/test/spawn.jl b/test/spawn.jl index 0040630cf2016..2d2e362aa6fa1 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -387,3 +387,20 @@ end @test_throws ArgumentError reduce(&, Base.AbstractCmd[]) @test_throws ArgumentError reduce(&, Base.Cmd[]) @test reduce(&, [`$echo abc`, `$echo def`, `$echo hij`]) == `$echo abc` & `$echo def` & `$echo hij` + +# test for proper handling of FD exhaustion +let ps = Pipe[] + try + for i = 1:100_000 + p = Pipe() + Base.link_pipe(p) + push!(ps, p) + end + @test false + catch ex + for p in ps + close(p) + end + @test (ex::Base.UVError).code == Base.UV_EMFILE + end +end