Skip to content

Commit

Permalink
Adding some finer grained metrics around RPC processing and increase …
Browse files Browse the repository at this point in the history
…buffer sizes (hashicorp#480)

* Adding some finer grained metrics around RPC processing

* Increasing follower RPC receive buffer size from 4KB to 256KB

* Increasing leaders RPC send buffer size from 4KB to 256KB
  • Loading branch information
mkeeler authored Dec 7, 2021
1 parent e55a8bf commit aa1afe5
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions net_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
)
Expand All @@ -27,6 +28,14 @@ const (
// rpcMaxPipeline controls the maximum number of outstanding
// AppendEntries RPC calls.
rpcMaxPipeline = 128

// connReceiveBufferSize is the size of the buffer we will use for reading RPC requests into
// on followers
connReceiveBufferSize = 256 * 1024 // 256KB

// connSendBufferSize is the size of the buffer we will use for sending RPC request data from
// the leader to followers.
connSendBufferSize = 256 * 1024 // 256KB
)

var (
Expand Down Expand Up @@ -344,7 +353,7 @@ func (n *NetworkTransport) getConn(target ServerAddress) (*netConn, error) {
target: target,
conn: conn,
dec: codec.NewDecoder(bufio.NewReader(conn), &codec.MsgpackHandle{}),
w: bufio.NewWriter(conn),
w: bufio.NewWriterSize(conn, connSendBufferSize),
}

netConn.enc = codec.NewEncoder(netConn.w, &codec.MsgpackHandle{})
Expand Down Expand Up @@ -517,7 +526,7 @@ func (n *NetworkTransport) listen() {
// closed.
func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {
defer conn.Close()
r := bufio.NewReader(conn)
r := bufio.NewReaderSize(conn, connReceiveBufferSize)
w := bufio.NewWriter(conn)
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
enc := codec.NewEncoder(w, &codec.MsgpackHandle{})
Expand Down Expand Up @@ -545,12 +554,19 @@ func (n *NetworkTransport) handleConn(connCtx context.Context, conn net.Conn) {

// handleCommand is used to decode and dispatch a single command.
func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, enc *codec.Encoder) error {
getTypeStart := time.Now()

// Get the rpc type
rpcType, err := r.ReadByte()
if err != nil {
return err
}

// measuring the time to get the first byte separately because the heartbeat conn will hang out here
// for a good while waiting for a heartbeat whereas the append entries/rpc conn should not.
metrics.MeasureSince([]string{"raft", "net", "getRPCType"}, getTypeStart)
decodeStart := time.Now()

// Create the RPC object
respCh := make(chan RPCResponse, 1)
rpc := RPC{
Expand All @@ -559,6 +575,7 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en

// Decode the command
isHeartbeat := false
var labels []metrics.Label
switch rpcType {
case rpcAppendEntries:
var req AppendEntriesRequest
Expand All @@ -574,32 +591,41 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
isHeartbeat = true
}

if isHeartbeat {
labels = []metrics.Label{{Name: "rpcType", Value: "Heartbeat"}}
} else {
labels = []metrics.Label{{Name: "rpcType", Value: "AppendEntries"}}
}
case rpcRequestVote:
var req RequestVoteRequest
if err := dec.Decode(&req); err != nil {
return err
}
rpc.Command = &req

labels = []metrics.Label{{Name: "rpcType", Value: "RequestVote"}}
case rpcInstallSnapshot:
var req InstallSnapshotRequest
if err := dec.Decode(&req); err != nil {
return err
}
rpc.Command = &req
rpc.Reader = io.LimitReader(r, req.Size)

labels = []metrics.Label{{Name: "rpcType", Value: "InstallSnapshot"}}
case rpcTimeoutNow:
var req TimeoutNowRequest
if err := dec.Decode(&req); err != nil {
return err
}
rpc.Command = &req

labels = []metrics.Label{{Name: "rpcType", Value: "TimeoutNow"}}
default:
return fmt.Errorf("unknown rpc type %d", rpcType)
}

metrics.MeasureSinceWithLabels([]string{"raft", "net", "rpcDecode"}, decodeStart, labels)

processStart := time.Now()

// Check for heartbeat fast-path
if isHeartbeat {
n.heartbeatFnLock.Lock()
Expand All @@ -620,8 +646,12 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en

// Wait for response
RESP:
// we will differentiate the heartbeat fast path from normal RPCs with labels
metrics.MeasureSinceWithLabels([]string{"raft", "net", "rpcEnqueue"}, processStart, labels)
respWaitStart := time.Now()
select {
case resp := <-respCh:
defer metrics.MeasureSinceWithLabels([]string{"raft", "net", "rpcRespond"}, respWaitStart, labels)
// Send the error first
respErr := ""
if resp.Error != nil {
Expand Down

0 comments on commit aa1afe5

Please sign in to comment.