Skip to content

Commit

Permalink
kvserver: loosely couple raft log truncation
Browse files Browse the repository at this point in the history
In the ReplicasStorage design we stop making any assumptions
regarding what is durable in the state machine when syncing a batch
that commits changes to the raft log. This implies the need to
make raft log truncation more loosely coupled than it is now, since
we can truncate only when certain that the state machine is durable
up to the truncation index.

Current raft log truncation flows through raft and even though the
RaftTruncatedStateKey is not a replicated key, it is coupled in
the sense that the truncation is done below raft when processing
the corresponding log entry (that asked for truncation to be done).

The current setup also has correctness issues wrt maintaining the
raft log size, when passing the delta bytes for a truncation. We
compute the delta at proposal time (to avoid repeating iteration over
the entries in all replicas), but we do not pass the first index
corresponding to the truncation, so gaps or overlaps cannot be
noticed at truncation time.

We do want to continue to have the raft leader guide the truncation
since we do not want either leader or followers to over-truncate,
given our desire to serve snapshots from any replica. In the loosely
coupled approach implemented here, the truncation request that flows
through raft serves as an upper bound on what can be truncated.

The truncation request includes an ExpectedFirstIndex. This is
further propagated using ReplicatedEvalResult.RaftExpectedFirstIndex.
This ExpectedFirstIndex allows one to notice gaps or overlaps when
enacting a sequence of truncations, which results in setting the
Replica.raftLogSizeTrusted to false. The correctness issue with
Replica.raftLogSize is not fully addressed since there are existing
consistency issues when evaluating a TruncateLogRequest (these
are now noted in a code comment).

Below raft, the truncation requests are queued onto a Replica
in pendingLogTruncations. The queueing and dequeuing is managed
by a raftLogTruncator that takes care of merging pending truncation
requests and enacting the truncations when the durability of the
state machine advances.

The pending truncation requests are taken into account in the
raftLogQueue when deciding whether to do another truncation.
Most of the behavior of the raftLogQueue is unchanged.

The new behavior is gated on a LooselyCoupledRaftLogTruncation
cluster version. Additionally, the new behavior can be turned
off using the kv.raft_log.enable_loosely_coupled_truncation.enabled
cluster setting, which is true by default. The latter is expected
to be a safety switch for 1 release after which we expect to
remove it. That removal will also cleanup some duplicated code
(that was non-trivial to refactor and share) between the previous
coupled and new loosely coupled truncation.

Note, this PR is the first of two -- loosely coupled truncation
is turned off via a constant in this PR. The next one will
eliminate the constant and put it under the control of the cluster
setting.

Informs #36262
Informs #16624,#38322

Release note (ops change): The cluster setting
kv.raft_log.loosely_coupled_truncation.enabled can be used to disable
loosely coupled truncation.
  • Loading branch information
sumeerbhola committed Feb 22, 2022
1 parent e1d4587 commit f9dee66
Show file tree
Hide file tree
Showing 25 changed files with 1,920 additions and 96 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-78 set the active cluster version in the format '<major>.<minor>'
version version 21.2-80 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-78</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-80</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ const (
// ExperimentalMVCCRangeTombstones enables the use of highly experimental MVCC
// range tombstones.
ExperimentalMVCCRangeTombstones
// LooselyCoupledRaftLogTruncation allows the cluster to reduce the coupling
// for raft log truncation, by allowing each replica to treat a truncation
// proposal as an upper bound on what should be truncated.
LooselyCoupledRaftLogTruncation

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -498,6 +502,10 @@ var versionsSingleton = keyedVersions{
Key: ExperimentalMVCCRangeTombstones,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 78},
},
{
Key: LooselyCoupledRaftLogTruncation,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 80},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ go_library(
"queue_helpers_testutil.go",
"raft.go",
"raft_log_queue.go",
"raft_log_truncator.go",
"raft_snapshot_queue.go",
"raft_transport.go",
"raft_truncator_replica.go",
"replica.go",
"replica_application_cmd.go",
"replica_application_cmd_buf.go",
Expand Down Expand Up @@ -164,6 +166,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
Expand Down Expand Up @@ -252,6 +255,7 @@ go_test(
"queue_concurrency_test.go",
"queue_test.go",
"raft_log_queue_test.go",
"raft_log_truncator_test.go",
"raft_test.go",
"raft_transport_test.go",
"raft_transport_unit_test.go",
Expand Down
47 changes: 33 additions & 14 deletions pkg/kv/kvserver/batcheval/cmd_truncate_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -83,13 +84,32 @@ func TruncateLog(
return result.Result{}, errors.Wrap(err, "getting term")
}

// Compute the number of bytes freed by this truncation. Note that this will
// only make sense for the leaseholder as we base this off its own first
// index (other replicas may have other first indexes assuming we're not
// still using the legacy truncated state key). In principle, this could be
// off either way, though in practice we don't expect followers to have
// a first index smaller than the leaseholder's (see #34287), and most of
// the time everyone's first index should be the same.
// Compute the number of bytes freed by this truncation. Note that using
// firstIndex only make sense for the leaseholder as we base this off its
// own first index (other replicas may have other first indexes). In
// principle, this could be off either way, though in practice we don't
// expect followers to have a first index smaller than the leaseholder's
// (see #34287), and most of the time everyone's first index should be the
// same.
// Additionally, it is possible that a write-heavy range has multiple in
// flight TruncateLogRequests, and using the firstIndex will result in
// duplicate accounting. The ExpectedFirstIndex, populated for clusters at
// LooselyCoupledRaftLogTruncation, allows us to avoid this problem.
//
// We have an additional source of error not mitigated by
// ExpectedFirstIndex. There is nothing synchronizing firstIndex with the
// state visible in readWriter. The former uses the in-memory state or
// fetches directly from the Engine. The latter uses Engine state from some
// point in time which can fall anywhere in the time interval starting from
// when the readWriter was created up to where we create an MVCCIterator
// below.
// TODO(sumeer): we can eliminate this error as part of addressing
// https://github.com/cockroachdb/cockroach/issues/55461 and
// https://github.com/cockroachdb/cockroach/issues/70974 that discuss taking
// a consistent snapshot of some Replica state and the engine.
if args.ExpectedFirstIndex > firstIndex {
firstIndex = args.ExpectedFirstIndex
}
start := keys.RaftLogKey(rangeID, firstIndex)
end := keys.RaftLogKey(rangeID, args.Index)

Expand All @@ -98,12 +118,8 @@ func TruncateLog(
// downstream of Raft.
//
// Note that any sideloaded payloads that may be removed by this truncation
// don't matter; they're not tracked in the raft log delta.
//
// TODO(tbg): it's difficult to prove that this computation doesn't have
// bugs that let it diverge. It might be easier to compute the stats
// from scratch, stopping when 4mb (defaultRaftLogTruncationThreshold)
// is reached as at that point we'll truncate aggressively anyway.
// are not tracked in the raft log delta. The delta will be adjusted below
// raft.
iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{UpperBound: end})
defer iter.Close()
// We can pass zero as nowNanos because we're only interested in SysBytes.
Expand All @@ -122,7 +138,10 @@ func TruncateLog(
pd.Replicated.State = &kvserverpb.ReplicaState{
TruncatedState: tState,
}

pd.Replicated.RaftLogDelta = ms.SysBytes
if cArgs.EvalCtx.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).IsActive(
clusterversion.LooselyCoupledRaftLogTruncation) {
pd.Replicated.RaftExpectedFirstIndex = firstIndex
}
return pd, nil
}
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -62,10 +63,12 @@ func TestTruncateLog(t *testing.T) {
firstIndex = 100
)

st := cluster.MakeTestingClusterSettings()
evalCtx := &MockEvalCtx{
Desc: &roachpb.RangeDescriptor{RangeID: rangeID},
Term: term,
FirstIndex: firstIndex,
ClusterSettings: st,
Desc: &roachpb.RangeDescriptor{RangeID: rangeID},
Term: term,
FirstIndex: firstIndex,
}

eng := storage.NewDefaultInMemForTesting()
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,12 @@ func (p *Result) MergeAndDestroy(q Result) error {

if p.Replicated.State.TruncatedState == nil {
p.Replicated.State.TruncatedState = q.Replicated.State.TruncatedState
p.Replicated.RaftExpectedFirstIndex = q.Replicated.RaftExpectedFirstIndex
} else if q.Replicated.State.TruncatedState != nil {
return errors.AssertionFailedf("conflicting TruncatedState")
}
q.Replicated.State.TruncatedState = nil
q.Replicated.RaftExpectedFirstIndex = 0

if q.Replicated.State.GCThreshold != nil {
if p.Replicated.State.GCThreshold == nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,25 @@ message ReplicatedEvalResult {
storage.enginepb.MVCCStats deprecated_delta = 10; // See #18828
storage.enginepb.MVCCStatsDelta delta = 18 [(gogoproto.nullable) = false];
ChangeReplicas change_replicas = 12;

// RaftLogDelta is the delta in bytes caused by truncation of the raft log.
// It is only populated when evaluating a TruncateLogRequest. The inclusive
// index for the truncation is specified in State.TruncatedState. This delta
// is computed under the assumption that the truncation is happening over
// the interval [RaftExpectedFirstIndex, index]. If the actual truncation at
// a replica is over some interval [x, interval] where x !=
// RaftExpectedFirstIndex it is that replica's job to recalculate this delta
// in order to be accurate, or to make note of the fact that its raft log
// size stats may now be inaccurate.
//
// NB: this delta does not include the byte size of sideloaded entries.
// Sideloaded entries are not expected to be common enough that it is worth
// the optimization to calculate the delta once (at the leaseholder).
int64 raft_log_delta = 13;
// RaftExpectedFirstIndex is populated starting at cluster version
// LooselyCoupledRaftLogTruncation. When this is not populated, the replica
// should not delay enacting the truncation.
uint64 raft_expected_first_index = 25;

// MVCCHistoryMutation describes mutations of MVCC history that may violate
// the closed timestamp, timestamp cache, and guarantees that rely on these
Expand Down
Loading

0 comments on commit f9dee66

Please sign in to comment.