Skip to content

Commit

Permalink
cleanup UV objects, especially status codes. fixes #1923. fixes #3676
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Jul 11, 2013
1 parent 416d857 commit ff7ac46
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 364 deletions.
4 changes: 4 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ function ls(args...)
warn_once("ls() is deprecated, use readdir() instead. If you are at the repl prompt, consider `;ls`.")
deprecated_ls(args...)
end
function start_timer(timer::TimeoutAsyncWork, timeout::Int, repeat::Int)
warn_once("start_timer now expects arguments in units of seconds. you may need to update your code")
start_timer(timer, timeout, repeat)
end

# Redirection Operators
@deprecate |(a::AbstractCmd,b::AbstractCmd) (a|>b)
Expand Down
2 changes: 1 addition & 1 deletion base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ macro cmd(str)
:(cmd_gen($(shell_parse(str))))
end

wait_close(x) = if x.open; wait(x.closenotify); end
wait_close(x) = if uv_isopen(x) wait(x.closenotify); end

wait_exit(x::Process) = if !process_exited(x); wait(x.exitnotify); end
wait_exit(x::ProcessChain) = for p in x.processes; wait_exit(p); end
Expand Down
157 changes: 88 additions & 69 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ abstract Socket <: AsyncStream

type TcpSocket <: Socket
handle::Ptr{Void}
open::Bool
status::Int
line_buffered::Bool
buffer::IOBuffer
readcb::Callback
Expand All @@ -245,38 +245,52 @@ type TcpSocket <: Socket
connectnotify::Condition
closecb::Callback
closenotify::Condition
TcpSocket(handle,open)=new(handle,open,true,PipeBuffer(),false,
Condition(),false,Condition(),false,Condition())

function TcpSocket()
this = TcpSocket(C_NULL,false)
this.handle = ccall(:jl_make_tcp,Ptr{Void},(Ptr{Void},Any),
eventloop(),this)
if (this.handle == C_NULL)
throw(UVError("Failed to create socket"))
end
this
TcpSocket(handle) = new(
handle,
StatusUninit,
true,
PipeBuffer(),
false,Condition(),
false,Condition(),
false,Condition())
end
function TcpSocket()
this = TcpSocket(c_malloc(_sizeof_uv_tcp))
if 0 != ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
c_free(this.handle)
this.handle = C_NULL
error(UVError("Failed to create tcp socket"))
end
associate_julia_struct(this.handle, this)
this.status = StatusInit
this
end

type TcpServer <: UVServer
handle::Ptr{Void}
open::Bool
status::Int
ccb::Callback
connectnotify::Condition
closecb::Callback
closenotify::Condition
TcpServer(handle,open) = new(handle,open,false,Condition(),false,Condition())

function TcpServer()
this = TcpServer(C_NULL,false)
this.handle = ccall(:jl_make_tcp,Ptr{Void},(Ptr{Void},Any),
eventloop(),this)
if (this.handle == C_NULL)
throw(UVError("Failed to create socket server"))
end
this
TcpServer(handle) = new(
handle,
StatusUninit,
false,Condition(),
false,Condition())
end
function TcpServer()
this = TcpServer(c_malloc(_sizeof_uv_tcp))
if 0 != ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
c_free(this.handle)
this.handle = C_NULL
error(UVError("Failed to create tcp server"))
end
associate_julia_struct(this.handle, this)
this.status = StatusInit
this
end

#type UdpSocket <: Socket
Expand All @@ -293,19 +307,13 @@ end
#end


show(io::IO,sock::TcpSocket) = print(io,"TcpSocket(",sock.open?"connected,":
"disconnected,",nb_available(sock.buffer),
" bytes waiting)")
show(io::IO,sock::TcpSocket) = print(io,"TcpSocket(",uv_status_string(sock),", ",
nb_available(sock.buffer)," bytes waiting)")

show(io::IO,sock::TcpServer) = print(io,"TcpServer(",sock.open?"listening)":
"not listening)")
show(io::IO,sock::TcpServer) = print(io,"TcpServer(",uv_status_string(sock),")")

#show(io::IO,sock::UdpSocket) = print(io,"UdpSocket(",sock.open?"connected,":
# "disconnected,",nb_available(sock.buffer),
# " bytes waiting)")

_jl_tcp_init(loop::Ptr{Void}) = ccall(:jl_tcp_init,Ptr{Void},(Ptr{Void},),loop)
_jl_udp_init(loop::Ptr{Void}) = ccall(:jl_udp_init,Ptr{Void},(Ptr{Void},),loop)
#show(io::IO,sock::UdpSocket) = print(io,"UdpSocket(",uv_status_string(sock),", ",
# nb_available(sock.buffer)," bytes waiting)")

## VARIOUS METHODS TO BE MOVED TO BETTER LOCATION

Expand All @@ -316,8 +324,7 @@ _jl_sockaddr_from_addrinfo(addrinfo::Ptr{Void}) =
_jl_sockaddr_set_port(ptr::Ptr{Void},port::Uint16) =
ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},Uint16),ptr,port)

accept(server::TcpServer) = accept(server, TcpSocket())

accept(server::UVServer) = accept(server, TcpSocket())

##

Expand All @@ -328,28 +335,38 @@ const UV_SUCCESS = 0
const UV_EADDRINUSE = 5

function bind(sock::TcpServer, host::IPv4, port::Uint16)
err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, Uint16, Uint32),
assert(sock.status == StatusInit)
if 0 != ccall(:jl_tcp_bind, Int32, (Ptr{Void}, Uint16, Uint32),
sock.handle, hton(port), hton(host.host))
if err == -1 && _uv_lasterror() != UV_EADDRINUSE
throw(UVError("bind"))
if _uv_lasterror() != UV_EADDRINUSE
error(UVError("bind"))
else
return false
end
end
err != -1
sock.status = StatusOpen
true
end

function bind(sock::TcpServer, host::IPv6, port::Uint16)
err = ccall(:jl_tcp_bind6, Int32, (Ptr{Void}, Uint16, Ptr{Uint128}),
assert(sock.status == StatusInit)
if 0 != ccall(:jl_tcp_bind6, Int32, (Ptr{Void}, Uint16, Ptr{Uint128}),
sock.handle, hton(port), &hton(host.host))
if(err == -1 && _uv_lasterror() != UV_EADDRINUSE)
throw(UVError("bind"))
if _uv_lasterror() != UV_EADDRINUSE
error(UVError("bind"))
else
return false
end
end
err != -1
sock.status = StatusOpen
true
end

callback_dict = ObjectIdDict()

function _uv_hook_getaddrinfo(cb::Function, addrinfo::Ptr{Void}, status::Int32)
delete!(callback_dict,cb)
if status!=0 || addrinfo == C_NULL
if status != 0 || addrinfo == C_NULL
cb(UVError("getaddrinfo callback"))
return
end
Expand All @@ -370,55 +387,60 @@ function _uv_hook_getaddrinfo(cb::Function, addrinfo::Ptr{Void}, status::Int32)
ccall(:uv_freeaddrinfo,Void,(Ptr{Void},),freeaddrinfo)
end

jl_getaddrinfo(loop::Ptr{Void}, host::ByteString, service::Ptr{Void},
cb::Function) =
ccall(:jl_getaddrinfo, Int32, (Ptr{Void}, Ptr{Uint8}, Ptr{Uint8}, Any),
loop, host, service, cb)

function getaddrinfo(cb::Function,host::ASCIIString)
callback_dict[cb] = cb
jl_getaddrinfo(eventloop(),host,C_NULL,cb)
ccall(:jl_getaddrinfo, Int32, (Ptr{Void}, Ptr{Uint8}, Ptr{Uint8}, Any),
eventloop(), host, C_NULL, cb)
end

function getaddrinfo(host::ASCIIString)
c = Condition()
getaddrinfo(host) do IP
notify(c,IP)
end
ip = wait(c)
if isa(ip, Exception)
error(ip)
notify(c,IP)
end
return ip::IpAddr
return wait(c)::IpAddr
end

##

function connect(cb::Function, sock::TcpSocket, host::IPv4, port::Uint16)
assert(sock.status == StatusInit)
sock.ccb = cb
uv_error("connect",ccall(:jl_tcp4_connect,Int32,(Ptr{Void},Uint32,Uint16),
sock.handle,hton(host.host),hton(port)) == -1)
sock.status = StatusConnecting
nothing
end

function connect(sock::TcpSocket, host::IPv4, port::Uint16)
function connect(sock::TcpSocket, host::IPv4, port::Uint16)
assert(sock.status == StatusInit)
uv_error("connect",ccall(:jl_tcp4_connect,Int32,(Ptr{Void},Uint32,Uint16),
sock.handle,hton(host.host),hton(port)) == -1)
sock.status = StatusConnecting
wait_connected(sock)
nothing
end

function connect(cb::Function, sock::TcpSocket, host::IPv6, port::Uint16)
assert(sock.status == StatusInit)
sock.ccb = cb
uv_error("connect",ccall(:jl_tcp6_connect,Int32,(Ptr{Void},Ptr{Uint128},Uint16),
sock.handle,&hton(host.host),hton(port)) == -1)
sock.status = StatusConnecting
nothing
end

function connect(sock::TcpSocket, host::IPv6, port::Uint16)
function connect(sock::TcpSocket, host::IPv6, port::Uint16)
assert(sock.status == StatusInit)
uv_error("connect",ccall(:jl_tcp6_connect,Int32,(Ptr{Void},Ptr{Uint128},Uint16),
sock.handle,&hton(host.host),hton(port)) == -1)
sock.status = StatusConnecting
wait_connected(sock)
nothing
end

function connect(sock::TcpSocket, host::ASCIIString, port::Integer)
assert(sock.status == StatusInit)
ipaddr = getaddrinfo(host)
connect(sock,ipaddr,port)
end
Expand All @@ -427,11 +449,12 @@ end
connect(sock::TcpSocket, port::Integer) = connect(sock,IPv4(127,0,0,1),port)
connect(port::Integer) = connect(IPv4(127,0,0,1),port)

function default_connectcb(sock,status)
end
default_connectcb(sock,status) = nothing

function connect(cb::Function, sock::TcpSocket, host::ASCIIString, port)
sock.status = StatusConnecting
getaddrinfo(host) do ipaddr
sock.status = StatusInit
connect(cb,sock,ipaddr,port)
end
end
Expand All @@ -444,7 +467,7 @@ for (args,forward_args) in (((:(addr::InetAddr),), (:(addr.host),:(addr.port))),
@eval begin
connect(sock::Socket,$(args...)) = connect(sock,$(forward_args...))
connect(cb::Function,sock::Socket,$(args...)) =
connect(cb,sock,$(forward_args...))
connect(cb,sock,$(forward_args...))
function connect($(args...))
sock = TcpSocket()
sock.ccb = default_connectcb
Expand Down Expand Up @@ -476,19 +499,16 @@ listen(cb::Callback,sock::Socket; backlog::Integer=BACKLOG_DEFAULT) = (sock.ccb=

##

_jl_tcp_accept(server::Ptr{Void},client::Ptr{Void}) =
ccall(:uv_accept,Int32,(Ptr{Void},Ptr{Void}),server,client)
function accept_nonblock(server::TcpServer,client::TcpSocket)
err = _jl_tcp_accept(server.handle,client.handle)
err = ccall(:uv_accept,Int32,(Ptr{Void},Ptr{Void}),server.handle,client.handle)
if err == 0
client.open = true
client.status = StatusOpen
end
err
end
function accept_nonblock(server::TcpServer)
client = TcpSocket()
uv_error("accept",_jl_tcp_accept(server.handle,client.handle) == -1)
client.open = true
uv_error("accept", accept_nonblock(server, client) == -1)
client
end

Expand All @@ -504,7 +524,6 @@ function open_any_tcp_port(cb::Callback,default_port)
end
err = _uv_lasterror()
system = _uv_lastsystemerror()
sock.open = true #need to make close() work
close(sock)
if (err != UV_SUCCESS && err != UV_EADDRINUSE)
throw(UVError("open_any_tcp_port",err,system))
Expand Down
Loading

0 comments on commit ff7ac46

Please sign in to comment.