Skip to content

Commit

Permalink
simpler and faster message (de)serialize
Browse files Browse the repository at this point in the history
also separate `serialize_msg` from `serialize`
  • Loading branch information
JeffBezanson committed Apr 27, 2017
1 parent 8fcbd74 commit e62a7db
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 39 deletions.
7 changes: 6 additions & 1 deletion base/associative.jl
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,12 @@ function delete!(t::ObjectIdDict, key::ANY)
t
end

empty!(t::ObjectIdDict) = (t.ht = Vector{Any}(length(t.ht)); t.ndel = 0; t)
function empty!(t::ObjectIdDict)
resize!(t.ht, 32)
ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), t.ht, 0, sizeof(t.ht))
t.ndel = 0
return t
end

_oidd_nextind(a, i) = reinterpret(Int,ccall(:jl_eqtable_nextind, Csize_t, (Any, Csize_t), a, i))

Expand Down
2 changes: 1 addition & 1 deletion base/distributed/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected,
VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk,
binding_module, notify_error, atexit, julia_exename, julia_cmd,
AsyncGenerator, display_error, acquire, release
AsyncGenerator, display_error, acquire, release, invokelatest

# NOTE: clusterserialize.jl imports additional symbols from Base.Serializer for use

Expand Down
55 changes: 25 additions & 30 deletions base/distributed/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

abstract type AbstractMsg end

let REF_ID::Int = 1
global next_ref_id
next_ref_id() = (id = REF_ID; REF_ID += 1; id)
end
const REF_ID = Ref(1)
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)

struct RRID
whence::Int
Expand Down Expand Up @@ -80,34 +78,32 @@ end
# of approximately 10%. Can be removed once module Serializer
# has been suitably improved.

# replace CallMsg{Mode} with specific invocations
const msgtypes = filter!(x->x!=CallMsg, subtypes(AbstractMsg))
push!(msgtypes, CallMsg{:call}, CallMsg{:call_fetch})
const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg,
JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg,
CallMsg{:call}, CallMsg{:call_fetch}]

for (idx, tname) in enumerate(msgtypes)
nflds = length(fieldnames(tname))
@eval begin
function serialize(s::AbstractSerializer, o::$tname)
write(s.io, UInt8($idx))
for fld in fieldnames($tname)
serialize(s, getfield(o, fld))
end
end

function deserialize_msg(s::AbstractSerializer, ::Type{$tname})
data=Array{Any,1}($nflds)
for i in 1:$nflds
data[i] = deserialize(s)
end
return $tname(data...)
end
exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ]
@eval function serialize_msg(s::AbstractSerializer, o::$tname)
write(s.io, UInt8($idx))
$(exprs...)
return nothing
end
end

function deserialize_msg(s::AbstractSerializer)
idx = read(s.io, UInt8)
t = msgtypes[idx]
return eval(current_module(), Expr(:body, Expr(:return, Expr(:call, deserialize_msg, QuoteNode(s), QuoteNode(t)))))
let msg_cases = :(assert(false))
for i = length(msgtypes):-1:1
mti = msgtypes[i]
msg_cases = :(if idx == $i
return $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), nfields(mti))...))
else
$msg_cases
end)
end
@eval function deserialize_msg(s::AbstractSerializer)
idx = read(s.io, UInt8)
$msg_cases
end
end

function send_msg_unknown(s::IO, header, msg)
Expand Down Expand Up @@ -171,8 +167,7 @@ function serialize_hdr_raw(io, hdr)
end

function deserialize_hdr_raw(io)
data = Array{Int,1}(4)
read!(io, data)
data = read(io, Ref{NTuple{4,Int}}())[]
return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4]))
end

Expand All @@ -183,7 +178,7 @@ function send_msg_(w::Worker, header, msg, now::Bool)
try
reset_state(w.w_serializer)
serialize_hdr_raw(io, header)
eval(current_module(), Expr(:body, Expr(:return, Expr(:call, serialize, QuoteNode(w.w_serializer), QuoteNode(msg))))) # io is wrapped in w_serializer
invokelatest(serialize_msg, w.w_serializer, msg) # io is wrapped in w_serializer
write(io, MSG_BOUNDARY)

if !now && w.gcflag
Expand Down
2 changes: 1 addition & 1 deletion base/distributed/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
# println("header: ", header)

try
msg = deserialize_msg(serializer)
msg = invokelatest(deserialize_msg, serializer)
catch e
# Deserialization error; discard bytes in stream until boundary found
boundary_idx = 1
Expand Down
2 changes: 1 addition & 1 deletion base/distributed/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ function lookup_ref(pg, rrid, f)
rv = get(pg.refs, rrid, false)
if rv === false
# first we've heard of this ref
rv = RemoteValue(eval(Main, Expr(:body, Expr(:return, Expr(:call, f)))))
rv = RemoteValue(invokelatest(f))
pg.refs[rrid] = rv
push!(rv.clientset, rrid.whence)
end
Expand Down
3 changes: 0 additions & 3 deletions base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,6 @@ precompile(Tuple{typeof(Base.close), Base.TCPSocket})
precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}})
precompile(Tuple{typeof(Base.wait_readnb), Base.PipeEndpoint, Int64})
precompile(Tuple{typeof(Base.eof), Base.PipeEndpoint})
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinPGRPMsg}})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64})
precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}})
Expand Down Expand Up @@ -1479,7 +1478,6 @@ precompile(Tuple{typeof(Base.notify), Base.Condition, Base.Distributed.ProcessEx
precompile(Tuple{typeof(Base.pop!), Base.Dict{Int64, Union{Base.Distributed.Worker, Base.Distributed.LocalProcess}}, Int64, Void})
precompile(Tuple{typeof(Base.Distributed.deregister_worker), Base.Distributed.ProcessGroup, Int64})
precompile(Tuple{typeof(Base.Distributed.process_hdr), Base.TCPSocket, Bool})
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}})
precompile(Tuple{typeof(Base.Distributed.null_id), Base.Distributed.RRID})
precompile(Tuple{typeof(Base.Distributed.deliver_result), Base.TCPSocket, Symbol, Base.Distributed.RRID, Base.Distributed.RemoteException})
precompile(Tuple{typeof(Base.Distributed.disable_nagle), Base.TCPSocket})
Expand Down Expand Up @@ -1535,7 +1533,6 @@ precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSeri
precompile(Tuple{typeof(Base.unsafe_write), Base.TCPSocket, Base.RefValue{UInt8}, Int64})
precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Int64})
precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}})
precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinCompleteMsg}})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64})
precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64})
precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}})
Expand Down
2 changes: 1 addition & 1 deletion base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ end

function reset_state(s::AbstractSerializer)
s.counter = 0
s.table = ObjectIdDict()
empty!(s.table)
empty!(s.pending_refs)
s
end
Expand Down
10 changes: 9 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,15 @@ buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s)

## low-level calls to libuv ##

write(s::LibuvStream, b::UInt8) = write(s, Ref{UInt8}(b))
function write(s::LibuvStream, b::UInt8)
if !isnull(s.sendbuf)
buf = get(s.sendbuf)
if nb_available(buf) + 1 < buf.maxsize
return write(buf, b)
end
end
return write(s, Ref{UInt8}(b))
end

function uv_writecb_task(req::Ptr{Void}, status::Cint)
d = uv_req_data(req)
Expand Down

0 comments on commit e62a7db

Please sign in to comment.