Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simpler and faster message (de)serialize #21543

Merged
merged 1 commit into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion base/associative.jl
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,12 @@ function delete!(t::ObjectIdDict, key::ANY)
t
end

empty!(t::ObjectIdDict) = (t.ht = Vector{Any}(length(t.ht)); t.ndel = 0; t)
function empty!(t::ObjectIdDict)
resize!(t.ht, 32)
ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), t.ht, 0, sizeof(t.ht))
t.ndel = 0
return t
end

_oidd_nextind(a, i) = reinterpret(Int,ccall(:jl_eqtable_nextind, Csize_t, (Any, Csize_t), a, i))

Expand Down
2 changes: 1 addition & 1 deletion base/distributed/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, display_error, acquire, release
AsyncGenerator, display_error, acquire, release, invokelatest

# NOTE: clusterserialize.jl imports additional symbols from Base.Serializer for use

Expand Down
55 changes: 25 additions & 30 deletions base/distributed/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

abstract type AbstractMsg end

let REF_ID::Int = 1
global next_ref_id
next_ref_id() = (id = REF_ID; REF_ID += 1; id)
end
const REF_ID = Ref(1)
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)

struct RRID
whence::Int
Expand Down Expand Up @@ -80,34 +78,32 @@ end
# of approximately 10%. Can be removed once module Serializer
# has been suitably improved.

# replace CallMsg{Mode} with specific invocations
const msgtypes = filter!(x->x!=CallMsg, subtypes(AbstractMsg))
push!(msgtypes, CallMsg{:call}, CallMsg{:call_fetch})
const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg,
JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg,
CallMsg{:call}, CallMsg{:call_fetch}]

for (idx, tname) in enumerate(msgtypes)
nflds = length(fieldnames(tname))
@eval begin
function serialize(s::AbstractSerializer, o::$tname)
write(s.io, UInt8($idx))
for fld in fieldnames($tname)
serialize(s, getfield(o, fld))
end
end

function deserialize_msg(s::AbstractSerializer, ::Type{$tname})
data=Array{Any,1}($nflds)
for i in 1:$nflds
data[i] = deserialize(s)
end
return $tname(data...)
end
exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ]
@eval function serialize_msg(s::AbstractSerializer, o::$tname)
write(s.io, UInt8($idx))
$(exprs...)
return nothing
end
end

function deserialize_msg(s::AbstractSerializer)
idx = read(s.io, UInt8)
t = msgtypes[idx]
return eval(current_module(), Expr(:body, Expr(:return, Expr(:call, deserialize_msg, QuoteNode(s), QuoteNode(t)))))
let msg_cases = :(assert(false))
for i = length(msgtypes):-1:1
mti = msgtypes[i]
msg_cases = :(if idx == $i
return $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), nfields(mti))...))
else
$msg_cases
end)
end
@eval function deserialize_msg(s::AbstractSerializer)
idx = read(s.io, UInt8)
$msg_cases
end
end

function send_msg_unknown(s::IO, header, msg)
Expand Down Expand Up @@ -171,8 +167,7 @@ function serialize_hdr_raw(io, hdr)
end

function deserialize_hdr_raw(io)
data = Array{Int,1}(4)
read!(io, data)
data = read(io, Ref{NTuple{4,Int}}())[]
return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4]))
end

Expand All @@ -183,7 +178,7 @@ function send_msg_(w::Worker, header, msg, now::Bool)
try
reset_state(w.w_serializer)
serialize_hdr_raw(io, header)
eval(current_module(), Expr(:body, Expr(:return, Expr(:call, serialize, QuoteNode(w.w_serializer), QuoteNode(msg))))) # io is wrapped in w_serializer
invokelatest(serialize_msg, w.w_serializer, msg) # io is wrapped in w_serializer
write(io, MSG_BOUNDARY)

if !now && w.gcflag
Expand Down
2 changes: 1 addition & 1 deletion base/distributed/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
# println("header: ", header)

try
msg = deserialize_msg(serializer)
msg = invokelatest(deserialize_msg, serializer)
catch e
# Deserialization error; discard bytes in stream until boundary found
boundary_idx = 1
Expand Down
2 changes: 1 addition & 1 deletion base/distributed/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ function lookup_ref(pg, rrid, f)
rv = get(pg.refs, rrid, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue(eval(Main, Expr(:body, Expr(:return, Expr(:call, f)))))
rv = RemoteValue(invokelatest(f))
pg.refs[rrid] = rv
push!(rv.clientset, rrid.whence)
end
Expand Down
3 changes: 0 additions & 3 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,6 @@ precompile(Tuple{typeof(Base.close), Base.TCPSocket})
precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}})
precompile(Tuple{typeof(Base.wait_readnb), Base.PipeEndpoint, Int64})
precompile(Tuple{typeof(Base.eof), Base.PipeEndpoint})
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinPGRPMsg}})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64})
precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}})
Expand Down Expand Up @@ -1479,7 +1478,6 @@ precompile(Tuple{typeof(Base.notify), Base.Condition, Base.Distributed.ProcessEx
precompile(Tuple{typeof(Base.pop!), Base.Dict{Int64, Union{Base.Distributed.Worker, Base.Distributed.LocalProcess}}, Int64, Void})
precompile(Tuple{typeof(Base.Distributed.deregister_worker), Base.Distributed.ProcessGroup, Int64})
precompile(Tuple{typeof(Base.Distributed.process_hdr), Base.TCPSocket, Bool})
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}})
precompile(Tuple{typeof(Base.Distributed.null_id), Base.Distributed.RRID})
precompile(Tuple{typeof(Base.Distributed.deliver_result), Base.TCPSocket, Symbol, Base.Distributed.RRID, Base.Distributed.RemoteException})
precompile(Tuple{typeof(Base.Distributed.disable_nagle), Base.TCPSocket})
Expand Down Expand Up @@ -1535,7 +1533,6 @@ precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSeri
precompile(Tuple{typeof(Base.unsafe_write), Base.TCPSocket, Base.RefValue{UInt8}, Int64})
precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Int64})
precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}})
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinCompleteMsg}})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64})
precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}})
Expand Down
2 changes: 1 addition & 1 deletion base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ end

function reset_state(s::AbstractSerializer)
s.counter = 0
s.table = ObjectIdDict()
empty!(s.table)
empty!(s.pending_refs)
s
end
Expand Down
10 changes: 9 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,15 @@ buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s)

## low-level calls to libuv ##

write(s::LibuvStream, b::UInt8) = write(s, Ref{UInt8}(b))
function write(s::LibuvStream, b::UInt8)
if !isnull(s.sendbuf)
buf = get(s.sendbuf)
if nb_available(buf) + 1 < buf.maxsize
return write(buf, b)
end
end
return write(s, Ref{UInt8}(b))
end

function uv_writecb_task(req::Ptr{Void}, status::Cint)
d = uv_req_data(req)
Expand Down