From e62a7db7504e554c0ea440cb3d7113e076361eaa Mon Sep 17 00:00:00 2001 From: Jeff Bezanson Date: Mon, 24 Apr 2017 23:01:42 -0400 Subject: [PATCH] simpler and faster message (de)serialize also separate `serialize_msg` from `serialize` --- base/associative.jl | 7 +++- base/distributed/Distributed.jl | 2 +- base/distributed/messages.jl | 55 +++++++++++++--------------- base/distributed/process_messages.jl | 2 +- base/distributed/remotecall.jl | 2 +- base/precompile.jl | 3 -- base/serialize.jl | 2 +- base/stream.jl | 10 ++++- 8 files changed, 44 insertions(+), 39 deletions(-) diff --git a/base/associative.jl b/base/associative.jl index f46378b56d266..8c276ee8dc078 100644 --- a/base/associative.jl +++ b/base/associative.jl @@ -441,7 +441,12 @@ function delete!(t::ObjectIdDict, key::ANY) t end -empty!(t::ObjectIdDict) = (t.ht = Vector{Any}(length(t.ht)); t.ndel = 0; t) +function empty!(t::ObjectIdDict) + resize!(t.ht, 32) + ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), t.ht, 0, sizeof(t.ht)) + t.ndel = 0 + return t +end _oidd_nextind(a, i) = reinterpret(Int,ccall(:jl_eqtable_nextind, Csize_t, (Any, Csize_t), a, i)) diff --git a/base/distributed/Distributed.jl b/base/distributed/Distributed.jl index 36e4bcba53fc9..73270b0672b3b 100644 --- a/base/distributed/Distributed.jl +++ b/base/distributed/Distributed.jl @@ -10,7 +10,7 @@ import Base: getindex, wait, put!, take!, fetch, isready, push!, length, using Base: Process, Semaphore, JLOptions, AnyDict, buffer_writes, wait_connected, VERSION_STRING, sync_begin, sync_add, sync_end, async_run_thunk, binding_module, notify_error, atexit, julia_exename, julia_cmd, - AsyncGenerator, display_error, acquire, release + AsyncGenerator, display_error, acquire, release, invokelatest # NOTE: clusterserialize.jl imports additional symbols from Base.Serializer for use diff --git a/base/distributed/messages.jl b/base/distributed/messages.jl index aee31cec8bf50..17a447a154d1f 100644 --- a/base/distributed/messages.jl +++ b/base/distributed/messages.jl @@ -2,10 +2,8 @@ abstract type AbstractMsg end -let REF_ID::Int = 1 - global next_ref_id - next_ref_id() = (id = REF_ID; REF_ID += 1; id) -end +const REF_ID = Ref(1) +next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id) struct RRID whence::Int @@ -80,34 +78,32 @@ end # of approximately 10%. Can be removed once module Serializer # has been suitably improved. -# replace CallMsg{Mode} with specific invocations -const msgtypes = filter!(x->x!=CallMsg, subtypes(AbstractMsg)) -push!(msgtypes, CallMsg{:call}, CallMsg{:call_fetch}) +const msgtypes = Any[CallWaitMsg, IdentifySocketAckMsg, IdentifySocketMsg, + JoinCompleteMsg, JoinPGRPMsg, RemoteDoMsg, ResultMsg, + CallMsg{:call}, CallMsg{:call_fetch}] for (idx, tname) in enumerate(msgtypes) - nflds = length(fieldnames(tname)) - @eval begin - function serialize(s::AbstractSerializer, o::$tname) - write(s.io, UInt8($idx)) - for fld in fieldnames($tname) - serialize(s, getfield(o, fld)) - end - end - - function deserialize_msg(s::AbstractSerializer, ::Type{$tname}) - data=Array{Any,1}($nflds) - for i in 1:$nflds - data[i] = deserialize(s) - end - return $tname(data...) - end + exprs = Any[ :(serialize(s, o.$fld)) for fld in fieldnames(tname) ] + @eval function serialize_msg(s::AbstractSerializer, o::$tname) + write(s.io, UInt8($idx)) + $(exprs...) + return nothing end end -function deserialize_msg(s::AbstractSerializer) - idx = read(s.io, UInt8) - t = msgtypes[idx] - return eval(current_module(), Expr(:body, Expr(:return, Expr(:call, deserialize_msg, QuoteNode(s), QuoteNode(t))))) +let msg_cases = :(assert(false)) + for i = length(msgtypes):-1:1 + mti = msgtypes[i] + msg_cases = :(if idx == $i + return $(Expr(:call, QuoteNode(mti), fill(:(deserialize(s)), nfields(mti))...)) + else + $msg_cases + end) + end + @eval function deserialize_msg(s::AbstractSerializer) + idx = read(s.io, UInt8) + $msg_cases + end end function send_msg_unknown(s::IO, header, msg) @@ -171,8 +167,7 @@ function serialize_hdr_raw(io, hdr) end function deserialize_hdr_raw(io) - data = Array{Int,1}(4) - read!(io, data) + data = read(io, Ref{NTuple{4,Int}}())[] return MsgHeader(RRID(data[1], data[2]), RRID(data[3], data[4])) end @@ -183,7 +178,7 @@ function send_msg_(w::Worker, header, msg, now::Bool) try reset_state(w.w_serializer) serialize_hdr_raw(io, header) - eval(current_module(), Expr(:body, Expr(:return, Expr(:call, serialize, QuoteNode(w.w_serializer), QuoteNode(msg))))) # io is wrapped in w_serializer + invokelatest(serialize_msg, w.w_serializer, msg) # io is wrapped in w_serializer write(io, MSG_BOUNDARY) if !now && w.gcflag diff --git a/base/distributed/process_messages.jl b/base/distributed/process_messages.jl index 121467676a64f..1eba6eafd496d 100644 --- a/base/distributed/process_messages.jl +++ b/base/distributed/process_messages.jl @@ -158,7 +158,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) # println("header: ", header) try - msg = deserialize_msg(serializer) + msg = invokelatest(deserialize_msg, serializer) catch e # Deserialization error; discard bytes in stream until boundary found boundary_idx = 1 diff --git a/base/distributed/remotecall.jl b/base/distributed/remotecall.jl index 67d4476d0cc18..e03f1d19a8d9d 100644 --- a/base/distributed/remotecall.jl +++ b/base/distributed/remotecall.jl @@ -152,7 +152,7 @@ function lookup_ref(pg, rrid, f) rv = get(pg.refs, rrid, false) if rv === false # first we've heard of this ref - rv = RemoteValue(eval(Main, Expr(:body, Expr(:return, Expr(:call, f))))) + rv = RemoteValue(invokelatest(f)) pg.refs[rrid] = rv push!(rv.clientset, rrid.whence) end diff --git a/base/precompile.jl b/base/precompile.jl index 2342279959196..8bc84db37f499 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -1405,7 +1405,6 @@ precompile(Tuple{typeof(Base.close), Base.TCPSocket}) precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}}) precompile(Tuple{typeof(Base.wait_readnb), Base.PipeEndpoint, Int64}) precompile(Tuple{typeof(Base.eof), Base.PipeEndpoint}) -precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinPGRPMsg}}) precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64}) precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64}) precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}}) @@ -1479,7 +1478,6 @@ precompile(Tuple{typeof(Base.notify), Base.Condition, Base.Distributed.ProcessEx precompile(Tuple{typeof(Base.pop!), Base.Dict{Int64, Union{Base.Distributed.Worker, Base.Distributed.LocalProcess}}, Int64, Void}) precompile(Tuple{typeof(Base.Distributed.deregister_worker), Base.Distributed.ProcessGroup, Int64}) precompile(Tuple{typeof(Base.Distributed.process_hdr), Base.TCPSocket, Bool}) -precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}}) precompile(Tuple{typeof(Base.Distributed.null_id), Base.Distributed.RRID}) precompile(Tuple{typeof(Base.Distributed.deliver_result), Base.TCPSocket, Symbol, Base.Distributed.RRID, Base.Distributed.RemoteException}) precompile(Tuple{typeof(Base.Distributed.disable_nagle), Base.TCPSocket}) @@ -1535,7 +1533,6 @@ precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSeri precompile(Tuple{typeof(Base.unsafe_write), Base.TCPSocket, Base.RefValue{UInt8}, Int64}) precompile(Tuple{typeof(Base.Serializer.serialize), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Int64}) precompile(Tuple{typeof(Base.write), Base.TCPSocket, Array{UInt8, 1}}) -precompile(Tuple{typeof(Base.Distributed.deserialize_msg), Base.Distributed.ClusterSerializer{Base.TCPSocket}, Type{Base.Distributed.JoinCompleteMsg}}) precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int32}, Int64}) precompile(Tuple{typeof(Base.unsafe_read), Base.TCPSocket, Base.RefValue{Int64}, Int64}) precompile(Tuple{typeof(Base.read!), Base.TCPSocket, Array{UInt8, 1}}) diff --git a/base/serialize.jl b/base/serialize.jl index 55df21d7c201e..4b309489226c9 100644 --- a/base/serialize.jl +++ b/base/serialize.jl @@ -157,7 +157,7 @@ end function reset_state(s::AbstractSerializer) s.counter = 0 - s.table = ObjectIdDict() + empty!(s.table) empty!(s.pending_refs) s end diff --git a/base/stream.jl b/base/stream.jl index 4c0d931e72282..4bd1f4c070d88 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -863,7 +863,15 @@ buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s) ## low-level calls to libuv ## -write(s::LibuvStream, b::UInt8) = write(s, Ref{UInt8}(b)) +function write(s::LibuvStream, b::UInt8) + if !isnull(s.sendbuf) + buf = get(s.sendbuf) + if nb_available(buf) + 1 < buf.maxsize + return write(buf, b) + end + end + return write(s, Ref{UInt8}(b)) +end function uv_writecb_task(req::Ptr{Void}, status::Cint) d = uv_req_data(req)