From cf862eb7c79bec4bee9b16878ebf6ac1dd7a7500 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 29 Oct 2015 14:28:16 -0400 Subject: [PATCH] refactor AbstractPipe in terms of pipe_reader/pipe_writer instead of .out/.in --- base/process.jl | 4 ++++ base/stream.jl | 59 +++++++++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/base/process.jl b/base/process.jl index 3e36e07607fc2..cd1858995cd8c 100644 --- a/base/process.jl +++ b/base/process.jl @@ -303,6 +303,8 @@ type Process <: AbstractPipe this end end +pipe_reader(p::Process) = p.out +pipe_writer(p::Process) = p.in immutable ProcessChain <: AbstractPipe processes::Vector{Process} @@ -311,6 +313,8 @@ immutable ProcessChain <: AbstractPipe err::Redirectable ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3]) end +pipe_reader(p::ProcessChain) = p.out +pipe_writer(p::ProcessChain) = p.in function _jl_spawn(cmd, argv, loop::Ptr{Void}, pp::Process, in, out, err) diff --git a/base/stream.jl b/base/stream.jl index e13eb3f979039..846774f559423 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -529,14 +529,18 @@ end # (composed of two half-pipes: .in and .out) ########################################## +# allows sharing implementation of wrappers around other IO objects abstract AbstractPipe <: IO -# allows sharing implementation with Process and ProcessChain +function pipe_reader end +function pipe_writer end type Pipe <: AbstractPipe in::PipeEndpoint # writable out::PipeEndpoint # readable end Pipe() = Pipe(PipeEndpoint(), PipeEndpoint()) +pipe_reader(p::Pipe) = p.out +pipe_writer(p::Pipe) = p.in function link_pipe(pipe::Pipe; julia_only_read = false, @@ -544,37 +548,38 @@ function link_pipe(pipe::Pipe; link_pipe(pipe.out, julia_only_read, pipe.in, julia_only_write); end -show(io::IO,stream::Pipe) = print(io, +show(io::IO, stream::Pipe) = print(io, "Pipe(", uv_status_string(stream.in), " => ", uv_status_string(stream.out), ", ", nb_available(stream), " bytes waiting)") -write(io::AbstractPipe, byte::UInt8) = write(io.in, byte) -write(io::AbstractPipe, bytes::Vector{UInt8}) = write(io.in, bytes) -write{T<:AbstractPipe}(io::T, args...) = write(io.in, args...) -write{S<:AbstractPipe}(io::S, a::Array) = write(io.in, a) -buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(io.in, p, n) -buffer_writes(io::AbstractPipe, args...) = buffer_writes(io.in, args...) -flush(io::AbstractPipe) = flush(io.in) - -read(io::AbstractPipe, byte::Type{UInt8}) = read(io.out, byte) -read!(io::AbstractPipe, bytes::Vector{UInt8}) = read!(io.out, bytes) -read{T<:AbstractPipe}(io::T, args...) = read(io.out, args...) -read!{T<:AbstractPipe}(io::T, args...) = read!(io.out, args...) -readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(io.out, args...) -readbytes(io::AbstractPipe) = readbytes(io.out) -readavailable(io::AbstractPipe) = readavailable(io.out) - -isreadable(io::AbstractPipe) = isreadable(io.out) -iswritable(io::AbstractPipe) = iswritable(io.in) -isopen(io::AbstractPipe) = isopen(io.in) || isopen(io.out) -close(io::AbstractPipe) = (close(io.in); close(io.out)) -wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(io.out, nb) -wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(io.out, byte) -wait_close(io::AbstractPipe) = (wait_close(io.in); wait_close(io.out)) -nb_available(io::AbstractPipe) = nb_available(io.out) -eof(io::AbstractPipe) = eof(io.out) +write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io), byte) +write(io::AbstractPipe, bytes::Vector{UInt8}) = write(pipe_writer(io), bytes) +write{T<:AbstractPipe}(io::T, args...) = write(pipe_writer(io), args...) +write{S<:AbstractPipe}(io::S, a::Array) = write(pipe_writer(io), a) +buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(pipe_writer(io), p, n) +buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io), args...) +flush(io::AbstractPipe) = flush(pipe_writer(io)) + +read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io), byte) +read!(io::AbstractPipe, bytes::Vector{UInt8}) = read!(pipe_reader(io), bytes) +read{T<:AbstractPipe}(io::T, args...) = read(pipe_reader(io), args...) +read!{T<:AbstractPipe}(io::T, args...) = read!(pipe_reader(io), args...) +readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(pipe_reader(io), args...) +readbytes(io::AbstractPipe) = readbytes(pipe_reader(io)) +readavailable(io::AbstractPipe) = readavailable(pipe_reader(io)) + +isreadable(io::AbstractPipe) = isreadable(pipe_reader(io)) +iswritable(io::AbstractPipe) = iswritable(pipe_writer(io)) +isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io)) +close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io))) +wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb) +wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte) +wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io))) +nb_available(io::AbstractPipe) = nb_available(pipe_reader(io)) +eof(io::AbstractPipe) = eof(pipe_reader(io)) +reseteof(io::AbstractPipe) = reseteof(pipe_reader(io)) ########################################## # Async Worker