Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: replace remote proposal tracking with uncommitted log size protection #31408

Merged
merged 4 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 78 additions & 2 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,28 @@ const (
DefaultTableDescriptorLeaseRenewalTimeout = time.Minute
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)
var (
// defaultRaftElectionTimeoutTicks specifies the number of Raft Tick
// invocations that must pass between elections.
defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15)

// defaultRaftLogTruncationThreshold specifies the upper bound that a single
// Range's Raft log can grow to before log truncations are triggered, even
// if that means a snapshot will be required for a straggling follower.
defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64(
"COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 4<<20 /* 4 MB */)

// defaultRaftMaxSizePerMsg specifies the maximum number of Raft log entries
// that a leader will send to followers in a single MsgApp.
defaultRaftMaxSizePerMsg = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16<<10 /* 16 KB */)

// defaultRaftMaxSizePerMsg specifies how many "inflight" messages a leader
// will send to a follower without hearing a response.
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)
)

type lazyHTTPClient struct {
once sync.Once
Expand Down Expand Up @@ -421,6 +441,41 @@ type RaftConfig struct {
// RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader
// lease active duration should be of the raft election timeout.
RangeLeaseRaftElectionTimeoutMultiplier float64

// RaftLogTruncationThreshold controls how large a single Range's Raft log
// can grow. When a Range's Raft log grows above this size, the Range will
// begin performing log truncations.
RaftLogTruncationThreshold int64

// RaftProposalQuota controls the maximum aggregate size of Raft commands
// that a leader is allowed to propose concurrently.
//
// By default, the quota is set to a fraction of the Raft log truncation
// threshold. In doing so, we ensure all replicas have sufficiently up to
// date logs so that when the log gets truncated, the followers do not need
// non-preemptive snapshots. Changing this deserves care. Too low and
// everything comes to a grinding halt, too high and we're not really
// throttling anything (we'll still generate snapshots).
RaftProposalQuota int64

// RaftMaxUncommittedEntriesSize controls how large the uncommitted tail of
// the Raft log can grow. The limit is meant to provide protection against
// unbounded Raft log growth when quorum is lost and entries stop being
// committed but continue to be proposed.
RaftMaxUncommittedEntriesSize uint64

// RaftMaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
RaftMaxSizePerMsg uint64

// RaftMaxInflightMsgs controls how many "inflight" messages Raft will send
// to a follower without hearing a response. The total number of Raft log
// entries is a combination of this setting and RaftMaxSizePerMsg. The
// current default settings provide for up to 1 MB of raft log to be sent
// without acknowledgement. With an average entry size of 1 KB that
// translates to ~1024 commands that might be executed in the handling of a
// single raft.Ready operation.
RaftMaxInflightMsgs int
}

// SetDefaults initializes unset fields.
Expand All @@ -434,6 +489,27 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 {
cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier
}
if cfg.RaftLogTruncationThreshold == 0 {
cfg.RaftLogTruncationThreshold = defaultRaftLogTruncationThreshold
}
if cfg.RaftProposalQuota == 0 {
// By default, set this to a fraction of RaftLogMaxSize. See the comment
// on the field for the tradeoffs of setting this higher or lower.
cfg.RaftProposalQuota = cfg.RaftLogTruncationThreshold / 4
}
if cfg.RaftMaxUncommittedEntriesSize == 0 {
// By default, set this to twice the RaftProposalQuota. The logic here
// is that the quotaPool should be responsible for throttling proposals
// in all cases except for unbounded Raft re-proposals because it queues
// efficiently instead of dropping proposals on the floor indiscriminately.
cfg.RaftMaxUncommittedEntriesSize = uint64(2 * cfg.RaftProposalQuota)
}
if cfg.RaftMaxSizePerMsg == 0 {
cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg)
}
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}
}

// RaftElectionTimeout returns the raft election timeout, as computed from the
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,8 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
sc.RaftTickInterval = 10 * time.Millisecond
// Don't timeout raft leader. We don't want leadership moving.
sc.RaftElectionTimeoutTicks = 1000000
// Reduce the max uncommitted entry size.
sc.RaftMaxUncommittedEntriesSize = 64 << 10 // 64 KB
// Disable leader transfers during leaseholder changes so that we
// can easily create leader-not-leaseholder scenarios.
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
Expand Down Expand Up @@ -1233,7 +1235,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
// While a majority nodes are down, write some data.
putRes := make(chan *roachpb.Error)
go func() {
putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */))
putArgs := putArgs([]byte("b"), make([]byte, sc.RaftMaxUncommittedEntriesSize/8))
_, err := client.SendWrapped(context.Background(), propNode, putArgs)
putRes <- err
}()
Expand All @@ -1254,11 +1256,10 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
}

// Check raft log size.
const logSizeLimit = 64 << 10 // 64 KB
curlogSize := leaderRepl.GetRaftLogSize()
logSize := curlogSize - initLogSize
logSizeStr := humanizeutil.IBytes(logSize)
if logSize > logSizeLimit {
if uint64(logSize) > sc.RaftMaxUncommittedEntriesSize {
t.Fatalf("raft log size grew to %s", logSizeStr)
}
t.Logf("raft log size grew to %s", logSizeStr)
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
Expand All @@ -49,9 +48,6 @@ const (
raftLogQueueConcurrency = 4
)

// raftLogMaxSize limits the maximum size of the Raft log.
var raftLogMaxSize = envutil.EnvOrDefaultInt64("COCKROACH_RAFT_LOG_MAX_SIZE", 4<<20 /* 4 MB */)

// raftLogQueue manages a queue of replicas slated to have their raft logs
// truncated by removing unneeded entries.
type raftLogQueue struct {
Expand Down Expand Up @@ -118,8 +114,8 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int
if targetSize > *r.mu.zone.RangeMaxBytes {
targetSize = *r.mu.zone.RangeMaxBytes
}
if targetSize > raftLogMaxSize {
targetSize = raftLogMaxSize
if targetSize > r.store.cfg.RaftLogTruncationThreshold {
targetSize = r.store.cfg.RaftLogTruncationThreshold
}
firstIndex, err := r.raftFirstIndexLocked()
pendingSnapshotIndex := r.mu.pendingSnapshotIndex
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"google.golang.org/grpc"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -615,6 +616,7 @@ func (t *RaftTransport) startProcessNewQueue(
// for closing the OutgoingSnapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
raftCfg *base.RaftConfig,
storePool *StorePool,
header SnapshotRequest_Header,
snap *OutgoingSnapshot,
Expand All @@ -640,5 +642,5 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %s", err)
}
}()
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent)
return sendSnapshot(ctx, raftCfg, t.st, stream, storePool, header, snap, newBatch, sent)
}
Loading