diff --git a/net_transport.go b/net_transport.go index 3ac84529029..307dcab6f47 100644 --- a/net_transport.go +++ b/net_transport.go @@ -11,6 +11,7 @@ import ( "sync" "time" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" ) @@ -27,6 +28,14 @@ const ( // rpcMaxPipeline controls the maximum number of outstanding // AppendEntries RPC calls. rpcMaxPipeline = 128 + + // connReceiveBufferSize is the size of the buffer we will use for reading RPC requests into + // on followers + connReceiveBufferSize = 256 * 1024 // 256KB + + // connSendBufferSize is the size of the buffer we will use for sending RPC request data from + // the leader to followers. + connSendBufferSize = 256 * 1024 // 256KB ) var ( @@ -344,7 +353,7 @@ func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) { target: target, conn: conn, dec: codec.NewDecoder(bufio.NewReader(conn), &codec.MsgpackHandle{}), - w: bufio.NewWriter(conn), + w: bufio.NewWriterSize(conn, connSendBufferSize), } netConn.enc = codec.NewEncoder(netConn.w, &codec.MsgpackHandle{}) @@ -517,7 +526,7 @@ func (n *NetworkTransport) listen() { // closed. func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { defer conn.Close() - r := bufio.NewReader(conn) + r := bufio.NewReaderSize(conn, connReceiveBufferSize) w := bufio.NewWriter(conn) dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) enc := codec.NewEncoder(w, &codec.MsgpackHandle{}) @@ -545,12 +554,19 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) { // handleCommand is used to decode and dispatch a single command. func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, enc *codec.Encoder) error { + getTypeStart := time.Now() + // Get the rpc type rpcType, err := r.ReadByte() if err != nil { return err } + // measuring the time to get the first byte separately because the heartbeat conn will hang out here + // for a good while waiting for a heartbeat whereas the append entries/rpc conn should not. + metrics.MeasureSince([]string{"raft", "net", "getRPCType"}, getTypeStart) + decodeStart := time.Now() + // Create the RPC object respCh := make(chan RPCResponse, 1) rpc := RPC{ @@ -559,6 +575,7 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en // Decode the command isHeartbeat := false + var labels []metrics.Label switch rpcType { case rpcAppendEntries: var req AppendEntriesRequest @@ -574,13 +591,18 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en isHeartbeat = true } + if isHeartbeat { + labels = []metrics.Label{{Name: "rpcType", Value: "Heartbeat"}} + } else { + labels = []metrics.Label{{Name: "rpcType", Value: "AppendEntries"}} + } case rpcRequestVote: var req RequestVoteRequest if err := dec.Decode(&req); err != nil { return err } rpc.Command = &req - + labels = []metrics.Label{{Name: "rpcType", Value: "RequestVote"}} case rpcInstallSnapshot: var req InstallSnapshotRequest if err := dec.Decode(&req); err != nil { @@ -588,18 +610,22 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en } rpc.Command = &req rpc.Reader = io.LimitReader(r, req.Size) - + labels = []metrics.Label{{Name: "rpcType", Value: "InstallSnapshot"}} case rpcTimeoutNow: var req TimeoutNowRequest if err := dec.Decode(&req); err != nil { return err } rpc.Command = &req - + labels = []metrics.Label{{Name: "rpcType", Value: "TimeoutNow"}} default: return fmt.Errorf("unknown rpc type %d", rpcType) } + metrics.MeasureSinceWithLabels([]string{"raft", "net", "rpcDecode"}, decodeStart, labels) + + processStart := time.Now() + // Check for heartbeat fast-path if isHeartbeat { n.heartbeatFnLock.Lock() @@ -620,8 +646,12 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en // Wait for response RESP: + // we will differentiate the heartbeat fast path from normal RPCs with labels + metrics.MeasureSinceWithLabels([]string{"raft", "net", "rpcEnqueue"}, processStart, labels) + respWaitStart := time.Now() select { case resp := <-respCh: + defer metrics.MeasureSinceWithLabels([]string{"raft", "net", "rpcRespond"}, respWaitStart, labels) // Send the error first respErr := "" if resp.Error != nil {