Skip to content

Commit

Permalink
refactor AbstractPipe in terms of pipe_reader/pipe_writer instead of …
Browse files Browse the repository at this point in the history
….out/.in
  • Loading branch information
vtjnash committed Dec 24, 2015
1 parent 5a9824a commit cf862eb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
4 changes: 4 additions & 0 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ type Process <: AbstractPipe
this
end
end
pipe_reader(p::Process) = p.out
pipe_writer(p::Process) = p.in

immutable ProcessChain <: AbstractPipe
processes::Vector{Process}
Expand All @@ -311,6 +313,8 @@ immutable ProcessChain <: AbstractPipe
err::Redirectable
ProcessChain(stdios::StdIOSet) = new(Process[], stdios[1], stdios[2], stdios[3])
end
pipe_reader(p::ProcessChain) = p.out
pipe_writer(p::ProcessChain) = p.in

function _jl_spawn(cmd, argv, loop::Ptr{Void}, pp::Process,
in, out, err)
Expand Down
59 changes: 32 additions & 27 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -529,52 +529,57 @@ end
# (composed of two half-pipes: .in and .out)
##########################################

# allows sharing implementation of wrappers around other IO objects
abstract AbstractPipe <: IO
# allows sharing implementation with Process and ProcessChain
function pipe_reader end
function pipe_writer end

type Pipe <: AbstractPipe
in::PipeEndpoint # writable
out::PipeEndpoint # readable
end
Pipe() = Pipe(PipeEndpoint(), PipeEndpoint())
pipe_reader(p::Pipe) = p.out
pipe_writer(p::Pipe) = p.in

function link_pipe(pipe::Pipe;
julia_only_read = false,
julia_only_write = false)
link_pipe(pipe.out, julia_only_read, pipe.in, julia_only_write);
end

show(io::IO,stream::Pipe) = print(io,
show(io::IO, stream::Pipe) = print(io,
"Pipe(",
uv_status_string(stream.in), " => ",
uv_status_string(stream.out), ", ",
nb_available(stream), " bytes waiting)")

write(io::AbstractPipe, byte::UInt8) = write(io.in, byte)
write(io::AbstractPipe, bytes::Vector{UInt8}) = write(io.in, bytes)
write{T<:AbstractPipe}(io::T, args...) = write(io.in, args...)
write{S<:AbstractPipe}(io::S, a::Array) = write(io.in, a)
buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(io.in, p, n)
buffer_writes(io::AbstractPipe, args...) = buffer_writes(io.in, args...)
flush(io::AbstractPipe) = flush(io.in)

read(io::AbstractPipe, byte::Type{UInt8}) = read(io.out, byte)
read!(io::AbstractPipe, bytes::Vector{UInt8}) = read!(io.out, bytes)
read{T<:AbstractPipe}(io::T, args...) = read(io.out, args...)
read!{T<:AbstractPipe}(io::T, args...) = read!(io.out, args...)
readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(io.out, args...)
readbytes(io::AbstractPipe) = readbytes(io.out)
readavailable(io::AbstractPipe) = readavailable(io.out)

isreadable(io::AbstractPipe) = isreadable(io.out)
iswritable(io::AbstractPipe) = iswritable(io.in)
isopen(io::AbstractPipe) = isopen(io.in) || isopen(io.out)
close(io::AbstractPipe) = (close(io.in); close(io.out))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(io.out, nb)
wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(io.out, byte)
wait_close(io::AbstractPipe) = (wait_close(io.in); wait_close(io.out))
nb_available(io::AbstractPipe) = nb_available(io.out)
eof(io::AbstractPipe) = eof(io.out)
write(io::AbstractPipe, byte::UInt8) = write(pipe_writer(io), byte)
write(io::AbstractPipe, bytes::Vector{UInt8}) = write(pipe_writer(io), bytes)
write{T<:AbstractPipe}(io::T, args...) = write(pipe_writer(io), args...)
write{S<:AbstractPipe}(io::S, a::Array) = write(pipe_writer(io), a)
buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(pipe_writer(io), p, n)
buffer_writes(io::AbstractPipe, args...) = buffer_writes(pipe_writer(io), args...)
flush(io::AbstractPipe) = flush(pipe_writer(io))

read(io::AbstractPipe, byte::Type{UInt8}) = read(pipe_reader(io), byte)
read!(io::AbstractPipe, bytes::Vector{UInt8}) = read!(pipe_reader(io), bytes)
read{T<:AbstractPipe}(io::T, args...) = read(pipe_reader(io), args...)
read!{T<:AbstractPipe}(io::T, args...) = read!(pipe_reader(io), args...)
readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(pipe_reader(io), args...)
readbytes(io::AbstractPipe) = readbytes(pipe_reader(io))
readavailable(io::AbstractPipe) = readavailable(pipe_reader(io))

isreadable(io::AbstractPipe) = isreadable(pipe_reader(io))
iswritable(io::AbstractPipe) = iswritable(pipe_writer(io))
isopen(io::AbstractPipe) = isopen(pipe_writer(io)) || isopen(pipe_reader(io))
close(io::AbstractPipe) = (close(pipe_writer(io)); close(pipe_reader(io)))
wait_readnb(io::AbstractPipe, nb::Int) = wait_readnb(pipe_reader(io), nb)
wait_readbyte(io::AbstractPipe, byte::UInt8) = wait_readbyte(pipe_reader(io), byte)
wait_close(io::AbstractPipe) = (wait_close(pipe_writer(io)); wait_close(pipe_reader(io)))
nb_available(io::AbstractPipe) = nb_available(pipe_reader(io))
eof(io::AbstractPipe) = eof(pipe_reader(io))
reseteof(io::AbstractPipe) = reseteof(pipe_reader(io))

##########################################
# Async Worker
Expand Down

0 comments on commit cf862eb

Please sign in to comment.