diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index df859f10aaa6c..3caf002b3cd30 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -248,7 +248,8 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std print(out, '\n') flush(out) - disable_nagle(sock) + Sockets.nagle(sock, false) + Sockets.quickack(sock, true) if ccall(:jl_running_on_valgrind,Cint,()) != 0 println(out, "PID = $(getpid())") @@ -1180,18 +1181,6 @@ function interrupt(pids::AbstractVector=workers()) end end - -function disable_nagle(sock) - # disable nagle on all OSes - ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, 1) - @static if Sys.islinux() - # tcp_quickack is a linux only option - if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, 1) < 0 - @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1 - end - end -end - wp_bind_addr(p::LocalProcess) = p.bind_addr wp_bind_addr(p) = p.config.bind_addr diff --git a/stdlib/Distributed/src/precompile.jl b/stdlib/Distributed/src/precompile.jl index 8d7ebebc6d013..06eb22c91d587 100644 --- a/stdlib/Distributed/src/precompile.jl +++ b/stdlib/Distributed/src/precompile.jl @@ -36,7 +36,8 @@ precompile(Tuple{typeof(Distributed.launch), Distributed.LocalManager, Base.Dict precompile(Tuple{typeof(Distributed.start_worker), Base.PipeEndpoint, String}) precompile(Tuple{typeof(Distributed.socket_reuse_port)}) precompile(Tuple{typeof(Distributed.flush_gc_msgs)}) -precompile(Tuple{typeof(Distributed.disable_nagle), Sockets.TCPServer}) +precompile(Tuple{typeof(Sockets.nagle), Sockets.TCPServer, Bool}) +precompile(Tuple{typeof(Sockets.quickack), Sockets.TCPServer, Bool}) precompile(Tuple{typeof(Distributed.next_tunnel_port)}) precompile(Tuple{typeof(Base._delete!), Base.Dict{Int64, Union{Distributed.Worker, Distributed.LocalProcess}}, Int64}) precompile(Tuple{typeof(Distributed.send_msg_), Distributed.Worker, Distributed.MsgHeader, Distributed.JoinPGRPMsg, Bool}) @@ -85,7 +86,8 @@ precompile(Tuple{typeof(Distributed.process_hdr), Sockets.TCPSocket, Bool}) precompile(Tuple{typeof(Distributed.deserialize_msg), Distributed.ClusterSerializer{Sockets.TCPSocket}}) precompile(Tuple{typeof(Distributed.null_id), Distributed.RRID}) precompile(Tuple{typeof(Distributed.deliver_result), Sockets.TCPSocket, Symbol, Distributed.RRID, Distributed.RemoteException}) -precompile(Tuple{typeof(Distributed.disable_nagle), Sockets.TCPSocket}) +precompile(Tuple{typeof(Sockets.nagle), Sockets.TCPSocket, Bool}) +precompile(Tuple{typeof(Sockets.quickack), Sockets.TCPSocket, Bool}) precompile(Tuple{typeof(Distributed.message_handler_loop), Sockets.TCPSocket, Sockets.TCPSocket, Bool}) precompile(Tuple{typeof(Distributed.process_tcp_streams), Sockets.TCPSocket, Sockets.TCPSocket, Bool}) precompile(Tuple{Type{Distributed.JoinPGRPMsg}, Int64, Array{Union{Tuple{Any, Int64}, Tuple{Tuple{}, Any, Bool}}, 1}, Symbol, Bool}) @@ -126,7 +128,6 @@ precompile(Tuple{typeof(Distributed.deregister_worker), Distributed.ProcessGroup precompile(Tuple{typeof(Distributed.process_hdr), Sockets.TCPSocket, Bool}) precompile(Tuple{typeof(Distributed.null_id), Distributed.RRID}) precompile(Tuple{typeof(Distributed.deliver_result), Sockets.TCPSocket, Symbol, Distributed.RRID, Distributed.RemoteException}) -precompile(Tuple{typeof(Distributed.disable_nagle), Sockets.TCPSocket}) precompile(Tuple{typeof(Distributed.message_handler_loop), Sockets.TCPSocket, Sockets.TCPSocket, Bool}) precompile(Tuple{typeof(Distributed.process_tcp_streams), Sockets.TCPSocket, Sockets.TCPSocket, Bool}) precompile(Tuple{typeof(Serialization.deserialize), Distributed.ClusterSerializer{Sockets.TCPSocket}, Type{Union}}) diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index dba6791162d3d..7361d4d057e65 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -131,10 +131,12 @@ function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bo end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) - disable_nagle(r_stream) + Sockets.nagle(r_stream, false) + Sockets.quickack(r_stream, true) wait_connected(r_stream) if r_stream != w_stream - disable_nagle(w_stream) + Sockets.nagle(w_stream, false) + Sockets.quickack(w_stream, true) wait_connected(w_stream) end message_handler_loop(r_stream, w_stream, incoming) diff --git a/stdlib/Sockets/docs/src/index.md b/stdlib/Sockets/docs/src/index.md index f993e83480eaa..90acd104182f2 100644 --- a/stdlib/Sockets/docs/src/index.md +++ b/stdlib/Sockets/docs/src/index.md @@ -30,6 +30,8 @@ Sockets.send Sockets.recv Sockets.recvfrom Sockets.setopt +Sockets.nagle +Sockets.quickack ``` ```@meta diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 8f55edcf6b533..0c6e867b20639 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -471,6 +471,31 @@ function connect(sock::LibuvStream, args...) return sock end +""" + nagle(socket::Union{TCPServer, TCPSocket}, enable::Bool) + +Enables or disables Nagle's algorithm on a given TCP server or socket. +""" +function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool) + # disable or enable Nagle's algorithm on all OSes + ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable)) +end + +""" + quickack(socket::Union{TCPServer, TCPSocket}, enable::Bool) + +On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`. +""" +function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool) + @static if Sys.islinux() + # tcp_quickack is a linux only option + if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0 + @warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1 + end + end +end + + ## const BACKLOG_DEFAULT = 511