Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcpsocket supports a send buffer #6876

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,22 @@ function readbytes!(io::IOBuffer, b::Array{Uint8}, nb=length(b))
read_sub(io, b, 1, nr)
return nr
end
readbytes(io::IOBuffer) = read!(io, Array(Uint8, nb_available(io)))
readbytes(io::IOBuffer, nb) = read!(io, Array(Uint8, min(nb, nb_available(io))))
function readbytes(io::IOBuffer, nb=nb_available(io))
nbavail = nb_available(io)
nb = min(nb, nbavail)
if !io.seekable && io.readable && (nb == nbavail)
(io.ptr != 1) && compact(io)
data = io.data
maxsize = (io.maxsize == typemax(Int) ? 0 : io.maxsize)
io.data = Array(Uint8,maxsize)
io.ptr = 1
io.size = 0
resize!(data, nb)
return data
else
read!(io, Array(Uint8, nb))
end
end

function search(buf::IOBuffer, delim)
p = pointer(buf.data, buf.ptr)
Expand Down
37 changes: 17 additions & 20 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ type Worker
host::ByteString
port::Uint16
socket::TcpSocket
sendbuf::IOBuffer
del_msgs::Array{Any,1}
add_msgs::Array{Any,1}
id::Int
Expand All @@ -100,8 +99,12 @@ type Worker
manage::Function
config::Dict

Worker(host::String, port::Integer, sock::TcpSocket, id::Int) =
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false)
function Worker(host::String, port::Integer, sock::TcpSocket, id::Int)
w = new(bytestring(host), uint16(port), sock, {}, {}, id, false)
set_buffer_writes(sock, true)
make_lockable(sock)
w
end
end
Worker(host::String, port::Integer, sock::TcpSocket) =
Worker(host, port, sock, 0)
Expand Down Expand Up @@ -157,26 +160,21 @@ function flush_gc_msgs(w::Worker)
end
end

#TODO: Move to different Thread
function enq_send_req(sock::TcpSocket,buf,now::Bool)
arr=takebuf_array(buf)
write(sock,arr)
#TODO implement "now"
end

function send_msg_(w::Worker, kind, args, now::Bool)
#println("Sending msg $kind")
buf = w.sendbuf
serialize(buf, kind)
sock = w.socket
unlock = lock_w(sock; track_recursive=false)

serialize(sock, kind)
for arg in args
serialize(buf, arg)
serialize(sock, arg)
end

if !now && w.gcflag
flush_gc_msgs(w)
else
enq_send_req(w.socket,buf,now)
flush(sock)
end
unlock && unlock_w(sock)
end

function flush_gc_msgs()
Expand Down Expand Up @@ -347,7 +345,7 @@ end
function worker_id_from_socket(s)
w = get(map_sock_wrkr, s, nothing)
if isa(w,Worker)
if is(s, w.socket) || is(s, w.sendbuf)
if is(s, w.socket)
return w.id
end
end
Expand All @@ -365,7 +363,6 @@ function register_worker(pg, w)
map_pid_wrkr[w.id] = w
if isa(w, Worker)
map_sock_wrkr[w.socket] = w
map_sock_wrkr[w.sendbuf] = w
end
end

Expand All @@ -375,7 +372,6 @@ function deregister_worker(pg, pid)
w = pop!(map_pid_wrkr, pid, nothing)
if isa(w, Worker)
pop!(map_sock_wrkr, w.socket)
pop!(map_sock_wrkr, w.sendbuf)

# Notify the cluster manager of this workers death
if myid() == 1
Expand Down Expand Up @@ -674,7 +670,6 @@ end

function remotecall(w::Worker, f, args...)
rr = RemoteRef(w)
#println("$(myid()) asking for $rr")
send_msg(w, :call, rr2id(rr), f, args)
rr
end
Expand Down Expand Up @@ -872,6 +867,8 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
put!(lookup_ref(oid), val)
elseif is(msg, :identify_socket)
otherid = deserialize(sock)
assert(myid() != 1) # master process never receives a :identify_socket and
# we expect to have recd and processed a :join_pgrp by now
register_worker(Worker("", 0, sock, otherid))
elseif is(msg, :join_pgrp)
# first connection; get process group info from client
Expand Down
5 changes: 5 additions & 0 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,22 @@ type TcpSocket <: Socket
status::Int
line_buffered::Bool
buffer::IOBuffer
sendbuf::IOBuffer
buffer_writes::Bool # If true, write's are collected. Written when there are
# enough bytes or on an explicit flush
readcb::Callback
readnotify::Condition
ccb::Callback
connectnotify::Condition
closecb::Callback
closenotify::Condition
write_lock
TcpSocket(handle) = new(
handle,
StatusUninit,
true,
PipeBuffer(),
PipeBuffer(), false,
false,Condition(),
false,Condition(),
false,Condition())
Expand Down
136 changes: 117 additions & 19 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,20 @@ type Pipe <: AsyncStream
status::Int
buffer::IOBuffer
line_buffered::Bool
sendbuf::IOBuffer
buffer_writes::Bool
readcb::Callback
readnotify::Condition
ccb::Callback
connectnotify::Condition
closecb::Callback
closenotify::Condition
write_lock
Pipe(handle) = new(
handle,
StatusUninit,
PipeBuffer(),
true,
PipeBuffer(), true,
PipeBuffer(), false,
false,Condition(),
false,Condition(),
false,Condition())
Expand Down Expand Up @@ -172,15 +175,19 @@ type TTY <: AsyncStream
status::Int
line_buffered::Bool
buffer::IOBuffer
sendbuf::IOBuffer
buffer_writes::Bool
readcb::Callback
readnotify::Condition
closecb::Callback
closenotify::Condition
write_lock
TTY(handle) = new(
handle,
StatusUninit,
true,
PipeBuffer(),
PipeBuffer(), false,
false,Condition(),
false,Condition())
end
Expand Down Expand Up @@ -750,43 +757,92 @@ function _uv_hook_writecb(s::AsyncStream, req::Ptr{Void}, status::Int32)
nothing
end

const SENDBUF_SZ=1048576
function buffer_send(s::AsyncStream, nb)
if !s.buffer_writes
return (false, false)
else
totb = nb_available(s.sendbuf) + nb
if totb < SENDBUF_SZ
return (true, false)
elseif nb > SENDBUF_SZ
flush(s)
return (false, false)
else
return (true, true)
end
end
end


function write(s::AsyncStream, b::Uint8)
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
(do_buffering, do_flushing) = buffer_send(s, 1)
if do_buffering
write(s.sendbuf, b)
do_flushing && flush(s)
else
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
end
return 1
end
function write(s::AsyncStream, c::Char)
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
(do_buffering, do_flushing) = buffer_send(s, utf8sizeof(c))
if do_buffering
write(s.sendbuf, c)
do_flushing && flush(s)
else
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
end
return utf8sizeof(c)
end
function write{T}(s::AsyncStream, a::Array{T})
if isbits(T)
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
return int(length(a)*sizeof(T))
(do_buffering, do_flushing) = buffer_send(s, n)
if do_buffering
write(s.sendbuf, a)
do_flushing && flush(s)
else
write_array(s, pointer(a), n)
end
return int(n)
else
check_open(s)
invoke(write,(IO,Array),s,a)
end
end
function write(s::AsyncStream, p::Ptr, nb::Integer)
(do_buffering, do_flushing) = buffer_send(s, nb)
if do_buffering
write(s.sendbuf, p, nb)
do_flushing && flush(s)
else
write_array(s, p, nb)
end
return nb
end
function write_array(s::AsyncStream, p::Ptr, nb::Integer)
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
ct = current_task()
uv_req_set_data(uvw,ct)
ct.state = :waiting
stream_wait(ct)
return int(nb)
end

function flush(s::AsyncStream)
nb = nb_available(s.sendbuf)
if nb > 0
a = readbytes(s.sendbuf, nb)
write_array(s, pointer(a), nb)
end
end

function _uv_hook_writecb_task(s::AsyncStream,req::Ptr{Void},status::Int32)
Expand Down Expand Up @@ -939,3 +995,45 @@ for (x,writable,unix_fd,c_symbol) in ((:STDIN,false,0,:jl_uv_stdin),(:STDOUT,tru
end
end
end

set_buffer_writes(s::AsyncStream, on::Bool) = (oldf = s.buffer_writes; s.buffer_writes = on; oldf)

# recursive locking is OK
# returns bool stating if a corresponding unlock is required or not based on kw arg track_recursive
function lock_w(s::AsyncStream; track_recursive=true)
tid = object_id(current_task())
if isready(s.write_lock)
(ltid, cnt) = fetch(s.write_lock)
if ltid != tid
put!(s.write_lock, (tid, 1))
return true
else
if track_recursive
take!(s.write_lock)
put!(s.write_lock, (tid, cnt+1))
return true
else
return false
end
end
else
put!(s.write_lock, (tid, 1))
return true
end
end

function unlock_w(s::AsyncStream; test_lock=false)
if test_lock && !isready(s.write_lock)
error("No lock present to be unlocked")
end

(ltid, cnt) = take!(s.write_lock)
tid = object_id(current_task())
if ltid != tid
error("Current task doesn't own the lock.")
elseif cnt > 1
put!(s.write_lock, (tid, cnt-1))
end
end

make_lockable(s::AsyncStream) = (s.write_lock = RemoteRef())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently using a RemoteRef for locking. Wondering if I should change to use libuv mutexes. Local remote refs in workers suffer from the fact that they cannot be initialized till the worker id is known. Also libuv mutexes should be faster and more lightweight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that libuv mutexes are recursive and Julia tasks would show up as being part of the same thread. May not be usable.