Skip to content

Commit

Permalink
refactor IO types to be less brittle, more flat, and to fix #12829 and
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Sep 13, 2015
1 parent 4a2298d commit 43e283d
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 284 deletions.
5 changes: 2 additions & 3 deletions base/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export
StreamREPL

import Base:
AsyncStream,
Display,
display,
writemime,
Expand Down Expand Up @@ -884,15 +883,15 @@ end

outstream(s::StreamREPL) = s.stream

StreamREPL(stream::AsyncStream) = StreamREPL(stream, julia_green, Base.text_colors[:white], Base.answer_color())
StreamREPL(stream::IO) = StreamREPL(stream, julia_green, Base.text_colors[:white], Base.answer_color())

answer_color(r::LineEditREPL) = r.envcolors ? Base.answer_color() : r.answer_color
answer_color(r::StreamREPL) = r.answer_color
input_color(r::LineEditREPL) = r.envcolors ? Base.input_color() : r.input_color
input_color(r::StreamREPL) = r.input_color


function run_repl(stream::AsyncStream)
function run_repl(stream::IO)
repl =
@async begin
repl_channel = Channel(1)
Expand Down
15 changes: 5 additions & 10 deletions base/fs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -231,19 +231,14 @@ function read(f::File, ::Type{UInt8})
return ret%UInt8
end

function read!{T}(f::File, a::Array{T}, nel=length(a))
function read!(f::File, a::Vector{UInt8}, nel=length(a))
if nel < 0 || nel > length(a)
throw(BoundsError())
end
if isbits(T)
nb = nel*sizeof(T)
ret = ccall(:jl_fs_read, Int32, (Int32, Ptr{Void}, Csize_t),
f.handle, a, nb)
uv_error("read",ret)
else
invoke(read, Tuple{IO, Array}, s, a)
end
a
ret = ccall(:jl_fs_read, Int32, (Int32, Ptr{Void}, Csize_t),
f.handle, a, nel)
uv_error("read",ret)
return a
end

nb_available(f::File) = filesize(f) - position(f)
Expand Down
45 changes: 36 additions & 9 deletions base/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ isreadonly(s) = isreadable(s) && !iswritable(s)

## binary I/O ##

# all subtypes should implement this
write(s::IO, x::UInt8) = error(typeof(s)," does not support byte I/O")

write(io::IO, x) = throw(MethodError(write, (io, x)))
function write(io::IO, xs...)
local written::Int = 0
Expand Down Expand Up @@ -107,9 +104,6 @@ function write(io::IO, s::Symbol)
return write(io, pname, Int(ccall(:strlen, Csize_t, (Ptr{UInt8},), pname)))
end

# all subtypes should implement this
read(s::IO, ::Type{UInt8}) = error(typeof(s)," does not support byte I/O")

read(s::IO, ::Type{Int8}) = reinterpret(Int8, read(s,UInt8))

function read{T <: Integer}(s::IO, ::Type{T})
Expand All @@ -131,9 +125,20 @@ read{T}(s::IO, t::Type{T}, d1::Integer, dims::Integer...) =

read{T}(s::IO, ::Type{T}, dims::Dims) = read!(s, Array(T, dims))

function read!(s::IO, a::Vector{UInt8})

This comment has been minimized.

Copy link
@tkelman

tkelman Oct 3, 2015

Contributor

this should really be returning a after the loop, yeah?

This comment has been minimized.

Copy link
@vtjnash

vtjnash Oct 3, 2015

Author Member

agreed

This comment has been minimized.

Copy link
@tkelman

tkelman Oct 3, 2015

Contributor
for i in 1:length(a)
a[i] = read(s, UInt8)
end
end

function read!{T}(s::IO, a::Array{T})
for i in eachindex(a)
a[i] = read(s, T)
if isbits(T)
nb::Int = length(a) * sizeof(T)
read!(s, reinterpret(UInt8, a, (nb,)))
else
for i in eachindex(a)
a[i] = read(s, T)
end
end
return a
end
Expand Down Expand Up @@ -219,7 +224,7 @@ function readuntil(s::IO, t::AbstractString)
return takebuf_string(out)
end


readline() = readline(STDIN)
readline(s::IO) = readuntil(s, '\n')
readchomp(x) = chomp!(readall(x))

Expand Down Expand Up @@ -308,3 +313,25 @@ ismarked(io::IO) = io.mark >= 0

lock(::IO) = nothing
unlock(::IO) = nothing
reseteof(x::IO) = nothing

const SZ_UNBUFFERED_IO = 65536
buffer_writes(x::IO, bufsize=SZ_UNBUFFERED_IO) = nothing

function isopen end
function close end
function flush end
function wait_connected end
function wait_readnb end
function wait_readbyte end
function wait_close end
function nb_available end
function readavailable end
function isreadable end
function iswritable end
function copy end
function eof end

# all subtypes should implement this
read(s::IO, ::Type{UInt8}) = error(typeof(s)," does not support byte I/O")
write(s::IO, x::UInt8) = error(typeof(s)," does not support byte I/O")
1 change: 1 addition & 0 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ show(io::IO, b::AbstractIOBuffer) = print(io, "IOBuffer(data=UInt8[...], ",
"ptr=", b.ptr, ", ",
"mark=", b.mark, ")")

read!(from::AbstractIOBuffer, a::Vector{UInt8}) = read_sub(from, a, 1, length(a))
read!(from::AbstractIOBuffer, a::Array) = read_sub(from, a, 1, length(a))

function read_sub{T}(from::AbstractIOBuffer, a::AbstractArray{T}, offs, nel)
Expand Down
13 changes: 4 additions & 9 deletions base/iostream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,10 @@ function read{T<:Union{UInt16, Int16, UInt32, Int32, UInt64, Int64}}(s::IOStream
ccall(:jl_ios_get_nbyte_int, UInt64, (Ptr{Void}, Csize_t), s.ios, sizeof(T)) % T
end

function read!{T}(s::IOStream, a::Array{T})
if isbits(T)
nb = length(a)*sizeof(T)
if ccall(:ios_readall, UInt,
(Ptr{Void}, Ptr{Void}, UInt), s.ios, a, nb) < nb
throw(EOFError())
end
else
invoke(read!, Tuple{IO, Array}, s, a)
function read!(s::IOStream, a::Vector{UInt8})
if ccall(:ios_readall, UInt,
(Ptr{Void}, Ptr{Void}, UInt), s.ios, a, sizeof(a)) < sizeof(a)
throw(EOFError())
end
a
end
Expand Down
8 changes: 4 additions & 4 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ type Worker
c_state::Condition # wait for state changes
ct_time::Float64 # creation time

r_stream::AsyncStream
w_stream::AsyncStream
r_stream::IO
w_stream::IO
manager::ClusterManager
config::WorkerConfig

Expand Down Expand Up @@ -836,9 +836,9 @@ function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket)
message_handler_loop(r_stream, w_stream)
end

process_messages(r_stream::AsyncStream, w_stream::AsyncStream) = @schedule message_handler_loop(r_stream, w_stream)
process_messages(r_stream::IO, w_stream::IO) = @schedule message_handler_loop(r_stream, w_stream)

function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream)
function message_handler_loop(r_stream::IO, w_stream::IO)
global PGRP
global cluster_manager

Expand Down
49 changes: 28 additions & 21 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ immutable FileRedirect
end
end

immutable DevNullStream <: AsyncStream end
immutable DevNullStream <: IO end
const DevNull = DevNullStream()
isreadable(::DevNullStream) = false
iswritable(::DevNullStream) = true
Expand All @@ -96,17 +96,24 @@ write{T<:DevNullStream}(::T, args...) = 0
close(::DevNullStream) = nothing
flush(::DevNullStream) = nothing
copy(::DevNullStream) = DevNull
wait_connected(::DevNullStream) = nothing
wait_readnb(::DevNullStream) = wait()
wait_readbyte(::DevNullStream) = wait()
wait_close(::DevNullStream) = wait()
eof(::DevNullStream) = true

uvhandle(::DevNullStream) = C_NULL
uvtype(::DevNullStream) = UV_STREAM

uvhandle(x::Ptr) = x
uvtype(::Ptr) = UV_STREAM
uvtype(::DevNullStream) = UV_STREAM

# Not actually a pointer, but that's how we pass it through the C API so it's fine
uvhandle(x::RawFD) = convert(Ptr{Void}, x.fd % UInt)
uvtype(x::RawFD) = UV_RAW_FD

typealias Redirectable Union{AsyncStream, FS.File, FileRedirect, DevNullStream, IOStream, RawFD}
typealias Redirectable Union{IO, FileRedirect, RawFD}
typealias StdIOSet NTuple{3, Union{Redirectable, Ptr{Void}}} # XXX: remove Ptr{Void} once libuv is refactored to use upstream release

immutable CmdRedirect <: AbstractCmd
cmd::AbstractCmd
Expand Down Expand Up @@ -197,30 +204,30 @@ pipeline(src::Union{Redirectable,AbstractString}, cmd::AbstractCmd) = pipeline(c

pipeline(a, b, c, d...) = pipeline(pipeline(a,b), c, d...)

typealias RawOrBoxedHandle Union{UVHandle,AsyncStream,Redirectable,IOStream}
typealias StdIOSet NTuple{3,RawOrBoxedHandle}

type Process <: AbstractPipe
cmd::Cmd
handle::Ptr{Void}
in::AsyncStream
out::AsyncStream
err::AsyncStream
in::IO
out::IO
err::IO
exitcode::Int64
termsignal::Int32
exitcb::Callback
exitnotify::Condition
closecb::Callback
closenotify::Condition
function Process(cmd::Cmd, handle::Ptr{Void}, in::RawOrBoxedHandle, out::RawOrBoxedHandle, err::RawOrBoxedHandle)
if !isa(in, AsyncStream) || in === DevNull
in=DevNull
function Process(cmd::Cmd, handle::Ptr{Void},
in::Union{Redirectable, Ptr{Void}},
out::Union{Redirectable, Ptr{Void}},
err::Union{Redirectable, Ptr{Void}})
if !isa(in, IO)
in = DevNull
end
if !isa(out, AsyncStream) || out === DevNull
out=DevNull
if !isa(out, IO)
out = DevNull
end
if !isa(err, AsyncStream) || err === DevNull
err=DevNull
if !isa(err, IO)
err = DevNull
end
this = new(cmd, handle, in, out, err,
typemin(fieldtype(Process, :exitcode)),
Expand Down Expand Up @@ -431,7 +438,7 @@ end
# | | \ The function to be called once the uv handle is closed
# | \ The function to be called once the process exits
# \ A set of up to 256 stdio instructions, where each entry can be either:
# | - An AsyncStream to be passed to the child
# | - An IO to be passed to the child
# | - DevNull to pass /dev/null
# | - An FS.File object to redirect the output to
# \ - An ASCIIString specifying a filename to be opened
Expand Down Expand Up @@ -464,7 +471,7 @@ end
eachline(cmd::AbstractCmd) = eachline(cmd, DevNull)

# return a Process object to read-to/write-from the pipeline
function open(cmds::AbstractCmd, mode::AbstractString="r", other::AsyncStream=DevNull)
function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=DevNull)
if mode == "r"
in = other
out = io = Pipe()
Expand Down Expand Up @@ -502,18 +509,18 @@ function readandwrite(cmds::AbstractCmd)
(out, in, processes)
end

function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
function readbytes(cmd::AbstractCmd, stdin::Redirectable=DevNull)
out, procs = open(cmd, "r", stdin)
bytes = readbytes(out)
!success(procs) && pipeline_error(procs)
return bytes
end

function readall(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
function readall(cmd::AbstractCmd, stdin::Redirectable=DevNull)
return bytestring(readbytes(cmd, stdin))
end

function writeall(cmd::AbstractCmd, stdin::AbstractString, stdout::AsyncStream=DevNull)
function writeall(cmd::AbstractCmd, stdin::AbstractString, stdout::Redirectable=DevNull)
open(cmd, "w", stdout) do io
write(io, stdin)
end
Expand Down
24 changes: 7 additions & 17 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ end

## SOCKETS ##

abstract Socket <: AsyncStream

type TCPSocket <: Socket
type TCPSocket <: LibuvStream
handle::Ptr{Void}
status::Int
line_buffered::Bool
Expand Down Expand Up @@ -294,10 +292,7 @@ function TCPSocket()
this
end

lock(s::TCPSocket) = lock(s.lock)
unlock(s::TCPSocket) = unlock(s.lock)

type TCPServer <: UVServer
type TCPServer <: LibuvServer
handle::Ptr{Void}
status::Int
ccb::Callback
Expand Down Expand Up @@ -328,13 +323,8 @@ function TCPServer()
this
end

isreadable(io::TCPSocket) = true
iswritable(io::TCPSocket) = true

show(io::IO,sock::TCPSocket) = print(io,"TCPSocket(",uv_status_string(sock),", ",
nb_available(sock.buffer)," bytes waiting)")

show(io::IO,sock::TCPServer) = print(io,"TCPServer(",uv_status_string(sock),")")
isreadable(io::TCPSocket) = isopen(io) || nb_available(io) > 0
iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing

## VARIOUS METHODS TO BE MOVED TO BETTER LOCATION

Expand Down Expand Up @@ -365,7 +355,7 @@ _bind(sock::TCPServer, host::IPv6, port::UInt16) = ccall(:jl_tcp_bind6, Int32, (

# UDP

type UDPSocket <: Socket
type UDPSocket <: LibuvStream
handle::Ptr{Void}
status::Int
recvnotify::Condition
Expand Down Expand Up @@ -694,7 +684,7 @@ end

##

listen(sock::UVServer; backlog::Integer=BACKLOG_DEFAULT) = (uv_error("listen",_listen(sock;backlog=backlog)); sock)
listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) = (uv_error("listen",_listen(sock;backlog=backlog)); sock)

function listen(addr; backlog::Integer=BACKLOG_DEFAULT)
sock = TCPServer()
Expand All @@ -706,7 +696,7 @@ listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(IPv4(UInt32(0))
listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host,port);backlog=backlog)

listen(cb::Callback,args...; backlog::Integer=BACKLOG_DEFAULT) = (sock=listen(args...;backlog=backlog);sock.ccb=cb;sock)
listen(cb::Callback,sock::Socket; backlog::Integer=BACKLOG_DEFAULT) = (sock.ccb=cb;listen(sock;backlog=backlog))
listen(cb::Callback,sock::Union{TCPSocket,UDPSocket}; backlog::Integer=BACKLOG_DEFAULT) = (sock.ccb=cb;listen(sock;backlog=backlog))

##

Expand Down
Loading

0 comments on commit 43e283d

Please sign in to comment.