diff --git a/pkg/kv/kvserver/observedts/BUILD.bazel b/pkg/kv/kvserver/observedts/BUILD.bazel index b93a95fe849e..06ea892ee7df 100644 --- a/pkg/kv/kvserver/observedts/BUILD.bazel +++ b/pkg/kv/kvserver/observedts/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "observedts", @@ -14,3 +14,16 @@ go_library( "//pkg/util/log", ], ) + +go_test( + name = "observedts_test", + srcs = ["limit_test.go"], + embed = [":observedts"], + deps = [ + "//pkg/kv/kvserver/kvserverpb", + "//pkg/roachpb", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//vendor/github.com/stretchr/testify/require", + ], +) diff --git a/pkg/kv/kvserver/observedts/doc.go b/pkg/kv/kvserver/observedts/doc.go index 45fbfd3b0a6d..75014fd572aa 100644 --- a/pkg/kv/kvserver/observedts/doc.go +++ b/pkg/kv/kvserver/observedts/doc.go @@ -16,7 +16,11 @@ package observedts import "github.com/cockroachdb/cockroach/pkg/roachpb" -// D1 | A transaction's "max timestamp" is the upper bound of its uncertainty +// D1 ———————————————————————————————————————————————— +// +// Transaction.MaxTimestamp +// +// A transaction's "max timestamp" is the upper bound of its uncertainty // interval. The value is set to the transaction's initial timestamp + the // cluster's maximum clock skew. Assuming maximum clock skew bounds are // respected, this maximum timestamp places an upper bound on the commit @@ -24,8 +28,12 @@ import "github.com/cockroachdb/cockroach/pkg/roachpb" // transaction. var D1 = roachpb.Transaction{}.MaxTimestamp -// D2 | While reading, if a transaction encounters a value above its read -// timestamp but equal to or below its max timestamp, it triggers a read within +// D2 ———————————————————————————————————————————————— +// +// ReadWithinUncertaintyIntervalError +// +// While reading, if a transaction encounters a value above its read timestamp +// but equal to or below its max timestamp, it triggers a read within // uncertainty interval error. // // This error forces the transaction to increase its read timestamp, either @@ -36,7 +44,11 @@ var D1 = roachpb.Transaction{}.MaxTimestamp // linearizability and avoid stale reads. var D2 = roachpb.ReadWithinUncertaintyIntervalError{} -// D3 | An observed timestamp is a combination of a NodeID and a Timestamp. The +// D3 ———————————————————————————————————————————————— +// +// ObservedTimestamp +// +// An observed timestamp is a combination of a NodeID and a Timestamp. The // timestamp is said to have been "observed" because it was pulled from the // corresponding node's HLC clock. // @@ -46,16 +58,23 @@ var D2 = roachpb.ReadWithinUncertaintyIntervalError{} // given a smaller value. var D3 = roachpb.ObservedTimestamp{} -// D4 | A transaction collects observed timestamps as it visits nodes in the -// cluster when performing reads and writes. +// D4 ———————————————————————————————————————————————— +// +// Transaction.UpdateObservedTimestamp +// +// A transaction collects observed timestamps as it visits nodes in the cluster +// when performing reads and writes. var D4 = (&roachpb.Transaction{}).UpdateObservedTimestamp -// D5 | The observed timestamps are collected in a list on the transaction -// proto. The purpose of this list is to avoid uncertainty related restarts -// which normally occur when reading a value in the near future as per the -// max_timestamp field. +// D5 ———————————————————————————————————————————————— // -// ### Meaning: +// Transaction.ObservedTimestamps +// +// The observed timestamps are collected in a list on the transaction proto. The +// purpose of this list is to avoid uncertainty related restarts which normally +// occur when reading a value in the near future as per the max_timestamp field. +// +// Meaning // // Morally speaking, having an entry for a node in this list means that this // node has been visited before, and that no more uncertainty restarts are @@ -73,7 +92,7 @@ var D4 = (&roachpb.Transaction{}).UpdateObservedTimestamp // timestamp that is at least high as our entry in the list for node A, so no // future operation on node A will be uncertain. // -// ### Correctness: +// Correctness // // Thus, expressed properly, we can say that when a node has been read from // successfully before by a transaction, uncertainty for values written by a @@ -90,26 +109,26 @@ var D4 = (&roachpb.Transaction{}).UpdateObservedTimestamp // purposes of uncertainty. // // There are two invariants necessary for this property to hold: -// 1. a leaseholder's clock must always be equal to or greater than the timestamp -// of all writes that it has served. This is trivial to enforce for -// non-transactional writes. It is more complicated for transactional writes -// which may move their commit timestamp forward over their lifetime before -// committing, even after writing intents on remote Ranges. To accommodate -// this situation, transactions ensure that at the time of their commit, any -// leaseholder for a Range that contains one of its intent has an HLC clock -// with an equal or greater timestamp than the transaction's commit timestamp. -// TODO(nvanbenschoten): This is violated by txn refreshes. See #36431. -// 2. a leaseholder's clock must always be equal to or greater than the timestamp -// of all writes that previous leaseholders for its Range have served. We -// enforce that when a Replica acquires a lease it bumps its node's clock to a -// time higher than the previous leaseholder's clock when it stopped serving -// writes. This is accomplished cooperatively for lease transfers and through -// a statis period before lease expiration for lease acquisitions. It then -// follows by induction that, in conjunction with the previous invariant, this -// invariant holds for all leaseholders, given that a Range's initial -// leaseholder assumes responsibility for an empty range with no writes. -// -// ### Usage: +// 1. a leaseholder's clock must always be equal to or greater than the timestamp +// of all writes that it has served. This is trivial to enforce for +// non-transactional writes. It is more complicated for transactional writes +// which may move their commit timestamp forward over their lifetime before +// committing, even after writing intents on remote Ranges. To accommodate +// this situation, transactions ensure that at the time of their commit, any +// leaseholder for a Range that contains one of its intent has an HLC clock +// with an equal or greater timestamp than the transaction's commit timestamp. +// TODO(nvanbenschoten): This is violated by txn refreshes. See #36431. +// 2. a leaseholder's clock must always be equal to or greater than the timestamp +// of all writes that previous leaseholders for its Range have served. We +// enforce that when a Replica acquires a lease it bumps its node's clock to a +// time higher than the previous leaseholder's clock when it stopped serving +// writes. This is accomplished cooperatively for lease transfers and through +// a statis period before lease expiration for lease acquisitions. It then +// follows by induction that, in conjunction with the previous invariant, this +// invariant holds for all leaseholders, given that a Range's initial +// leaseholder assumes responsibility for an empty range with no writes. +// +// Usage // // The property ensures that when this list holds a corresponding entry for the // node who owns the lease that the current request is executing under, we can @@ -133,14 +152,34 @@ var D4 = (&roachpb.Transaction{}).UpdateObservedTimestamp // the list. In particular, if `read_timestamp` is taken from that node's clock, // we may add that to the list, which eliminates read uncertainty for reads on // that node. +// +// Follower Reads +// +// If the replica serving a transaction's read is not the leaseholder for its +// range, an observed timestamp pulled from the follower node's clock has no +// meaning for the purpose of reducing the transaction's uncertainty interval. +// This is because there is no guarantee that at the time of acquiring the +// observed timestamp from the follower node, the leaseholder hadn't already +// served writes at higher timestamps than the follower node's clock reflected. +// +// However, if the transaction performing a follower read happens to have an +// observed timestamp from the current leaseholder, this timestamp can be used +// to reduce the transaction's uncertainty interval. Even though the read is +// being served from a different replica in the range, the observed timestamp +// still places a bound on the values in the range that may have been written +// before the transaction began. var D5 = roachpb.Transaction{}.ObservedTimestamps -// D6 | Observed timestamps allow transactions to avoid uncertainty related -// restarts because they allow transactions to bound their "effective max -// timestamp" when reading on a node which they have previously collected an -// observed timestamp from. Similarly, observed timestamps can also assist a -// transaction even on its first visit to a node in cases where it gets stuck -// waiting on locks for long periods of time. +// D6 ———————————————————————————————————————————————— +// +// LimitTxnMaxTimestamp +// +// Observed timestamps allow transactions to avoid uncertainty related restarts +// because they allow transactions to bound their "effective max timestamp" when +// reading on a node which they have previously collected an observed timestamp +// from. Similarly, observed timestamps can also assist a transaction even on +// its first visit to a node in cases where it gets stuck waiting on locks for +// long periods of time. var D6 = LimitTxnMaxTimestamp // Ignore unused warnings. diff --git a/pkg/kv/kvserver/observedts/limit.go b/pkg/kv/kvserver/observedts/limit.go index 2700a98cc550..484d52807acd 100644 --- a/pkg/kv/kvserver/observedts/limit.go +++ b/pkg/kv/kvserver/observedts/limit.go @@ -18,14 +18,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) -// LimitTxnMaxTimestamp limits the batch transaction's max timestamp -// so that it respects any timestamp already observed on this node. -// This prevents unnecessary uncertainty interval restarts caused by -// reading a value written at a timestamp between txn.Timestamp and -// txn.MaxTimestamp. The replica lease's start time is also taken into -// consideration to ensure that a lease transfer does not result in -// the observed timestamp for this node being inapplicable to data -// previously written by the former leaseholder. To wit: +// LimitTxnMaxTimestamp limits the transaction's max timestamp so that it +// respects any timestamp already observed on the leaseholder's node. This +// prevents unnecessary uncertainty interval restarts caused by reading a value +// written at a timestamp between txn.Timestamp and txn.MaxTimestamp. +// +// The lease's start time is also taken into consideration to ensure that a +// lease transfer does not result in the observed timestamp for this node being +// inapplicable to data previously written by the former leaseholder. To wit: // // 1. put(k on leaseholder n1), gateway chooses t=1.0 // 2. begin; read(unrelated key on n2); gateway chooses t=0.98 @@ -39,40 +39,42 @@ import ( // the previous leaseholder. // func LimitTxnMaxTimestamp( - ctx context.Context, ba *roachpb.BatchRequest, status kvserverpb.LeaseStatus, -) { - if ba.Txn == nil { - return + ctx context.Context, txn *roachpb.Transaction, status kvserverpb.LeaseStatus, +) *roachpb.Transaction { + if txn == nil || status.State != kvserverpb.LeaseState_VALID { + return txn } // For calls that read data within a txn, we keep track of timestamps - // observed from the various participating nodes' HLC clocks. If we have - // a timestamp on file for this Node which is smaller than MaxTimestamp, - // we can lower MaxTimestamp accordingly. If MaxTimestamp drops below - // ReadTimestamp, we effectively can't see uncertainty restarts anymore. - // TODO(nvanbenschoten): This should use the lease's node id. - obsTS, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID) + // observed from the various participating nodes' HLC clocks. If we have a + // timestamp on file for the leaseholder's node which is smaller than + // MaxTimestamp, we can lower MaxTimestamp accordingly. If MaxTimestamp + // drops below ReadTimestamp, we effectively can't see uncertainty restarts + // anymore. + // + // Note that we care about an observed timestamp from the leaseholder's + // node, even if this is a follower read on a different node. See the + // comment in doc.go about "Follower Reads" for more. + obsTS, ok := txn.GetObservedTimestamp(status.Lease.Replica.NodeID) if !ok { - return - } - // If the lease is valid, we use the greater of the observed - // timestamp and the lease start time, up to the max timestamp. This - // ensures we avoid incorrect assumptions about when data was - // written, in absolute time on a different node, which held the - // lease before this replica acquired it. - // TODO(nvanbenschoten): Do we ever need to call this when - // status.State != VALID? - if status.State == kvserverpb.LeaseState_VALID { - obsTS.Forward(status.Lease.Start) + return txn } - if obsTS.Less(ba.Txn.MaxTimestamp) { + // If the lease is valid, we use the greater of the observed timestamp and + // the lease start time, up to the max timestamp. This ensures we avoid + // incorrect assumptions about when data was written, in absolute time on a + // different node, which held the lease before this replica acquired it. + obsTS.Forward(status.Lease.Start) + // If the observed timestamp reduces the transaction's uncertainty interval, + // update the transacion proto. + if obsTS.Less(txn.MaxTimestamp) { // Copy-on-write to protect others we might be sharing the Txn with. - txnClone := ba.Txn.Clone() + txnClone := txn.Clone() // The uncertainty window is [ReadTimestamp, maxTS), so if that window // is empty, there won't be any uncertainty restarts. - if obsTS.LessEq(ba.Txn.ReadTimestamp) { + if obsTS.LessEq(txn.ReadTimestamp) { log.Event(ctx, "read has no clock uncertainty") } txnClone.MaxTimestamp.Backward(obsTS) - ba.Txn = txnClone + txn = txnClone } + return txn } diff --git a/pkg/kv/kvserver/observedts/limit_test.go b/pkg/kv/kvserver/observedts/limit_test.go new file mode 100644 index 000000000000..e169645b28a8 --- /dev/null +++ b/pkg/kv/kvserver/observedts/limit_test.go @@ -0,0 +1,117 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package observedts + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func TestLimitTxnMaxTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + + txn := &roachpb.Transaction{ + ReadTimestamp: hlc.Timestamp{WallTime: 10}, + MaxTimestamp: hlc.Timestamp{WallTime: 20}, + } + txn.UpdateObservedTimestamp(1, hlc.Timestamp{WallTime: 15}) + txnWithMaxTimestamp := func(ts hlc.Timestamp) *roachpb.Transaction { + txnClone := txn.Clone() + txnClone.MaxTimestamp = ts + return txnClone + } + + repl1 := roachpb.ReplicaDescriptor{NodeID: 1} + repl2 := roachpb.ReplicaDescriptor{NodeID: 2} + lease := kvserverpb.LeaseStatus{ + Lease: roachpb.Lease{Replica: repl1}, + State: kvserverpb.LeaseState_VALID, + } + + testCases := []struct { + name string + txn *roachpb.Transaction + lease kvserverpb.LeaseStatus + expTxn *roachpb.Transaction + }{ + { + name: "no txn", + txn: nil, + lease: lease, + expTxn: nil, + }, + { + name: "invalid lease", + txn: txn, + lease: func() kvserverpb.LeaseStatus { + leaseClone := lease + leaseClone.State = kvserverpb.LeaseState_EXPIRED + return leaseClone + }(), + expTxn: txn, + }, + { + name: "no observed timestamp", + txn: txn, + lease: func() kvserverpb.LeaseStatus { + leaseClone := lease + leaseClone.Lease.Replica = repl2 + return leaseClone + }(), + expTxn: txn, + }, + { + name: "valid lease", + txn: txn, + lease: lease, + expTxn: txnWithMaxTimestamp(hlc.Timestamp{WallTime: 15}), + }, + { + name: "valid lease with start time above observed timestamp", + txn: txn, + lease: func() kvserverpb.LeaseStatus { + leaseClone := lease + leaseClone.Lease.Start = hlc.Timestamp{WallTime: 18} + return leaseClone + }(), + expTxn: txnWithMaxTimestamp(hlc.Timestamp{WallTime: 18}), + }, + { + name: "valid lease with start time above max timestamp", + txn: txn, + lease: func() kvserverpb.LeaseStatus { + leaseClone := lease + leaseClone.Lease.Start = hlc.Timestamp{WallTime: 22} + return leaseClone + }(), + expTxn: txn, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + // Copy txn to test that the transaction is not mutated. + var txnBefore *roachpb.Transaction + if test.txn != nil { + txnBefore = test.txn.Clone() + } + + txnOut := LimitTxnMaxTimestamp(context.Background(), test.txn, test.lease) + require.Equal(t, test.expTxn, txnOut) + require.Equal(t, txnBefore, test.txn) + }) + } +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 84754fb66218..e5df2e797782 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -719,7 +719,7 @@ func (r *Replica) descRLocked() *roachpb.RangeDescriptor { // NodeID returns the ID of the node this replica belongs to. func (r *Replica) NodeID() roachpb.NodeID { - return r.store.nodeDesc.NodeID + return r.store.NodeID() } // GetNodeLocality returns the locality of the node this replica belongs to. diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 1a972cf0eda4..81bbf2c0fa89 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -306,7 +306,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( } } // Limit the transaction's maximum timestamp using observed timestamps. - observedts.LimitTxnMaxTimestamp(ctx, ba, status) + ba.Txn = observedts.LimitTxnMaxTimestamp(ctx, ba.Txn, status) // Determine the maximal set of key spans that the batch will operate // on. We only need to do this once and we make sure to do so after we diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index eb1c44fe32f0..a3213de21112 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2260,6 +2260,9 @@ func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status { // ClusterID accessor. func (s *Store) ClusterID() uuid.UUID { return s.Ident.ClusterID } +// NodeID accessor. +func (s *Store) NodeID() roachpb.NodeID { return s.Ident.NodeID } + // StoreID accessor. func (s *Store) StoreID() roachpb.StoreID { return s.Ident.StoreID } diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index 5798d1b1e514..23f268c7e222 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -108,7 +108,7 @@ func (s *Store) Send( // can use it to shorten its uncertainty interval when it comes back to // this node. if pErr != nil { - pErr.OriginNode = ba.Replica.NodeID + pErr.OriginNode = s.NodeID() if txn := pErr.GetTxn(); txn == nil { pErr.SetTxn(ba.Txn) } @@ -150,9 +150,9 @@ func (s *Store) Send( // updating the top end of our uncertainty timestamp would lead to a // restart (at least in the absence of a prior observed timestamp from // this node, in which case the following is a no-op). - if _, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); !ok { + if _, ok := ba.Txn.GetObservedTimestamp(s.NodeID()); !ok { txnClone := ba.Txn.Clone() - txnClone.UpdateObservedTimestamp(ba.Replica.NodeID, s.cfg.Clock.Now()) + txnClone.UpdateObservedTimestamp(s.NodeID(), s.Clock().Now()) ba.Txn = txnClone } } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index b0168d86d869..b8ef7409d59a 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -1134,19 +1134,13 @@ func TestStoreObservedTimestamp(t *testing.T) { defer log.Scope(t).Close(t) badKey := []byte("a") goodKey := []byte("b") - desc := roachpb.ReplicaDescriptor{ - NodeID: 5, - // not relevant - StoreID: 1, - ReplicaID: 2, - } testCases := []struct { key roachpb.Key - check func(int64, roachpb.Response, *roachpb.Error) + check func(int64, roachpb.NodeID, roachpb.Response, *roachpb.Error) }{ {badKey, - func(wallNanos int64, _ roachpb.Response, pErr *roachpb.Error) { + func(wallNanos int64, nodeID roachpb.NodeID, _ roachpb.Response, pErr *roachpb.Error) { if pErr == nil { t.Fatal("expected an error") } @@ -1154,18 +1148,18 @@ func TestStoreObservedTimestamp(t *testing.T) { if txn == nil || txn.ID == (uuid.UUID{}) { t.Fatalf("expected nontrivial transaction in %s", pErr) } - if ts, _ := txn.GetObservedTimestamp(desc.NodeID); ts.WallTime != wallNanos { + if ts, _ := txn.GetObservedTimestamp(nodeID); ts.WallTime != wallNanos { t.Fatalf("unexpected observed timestamps, expected %d->%d but got map %+v", - desc.NodeID, wallNanos, txn.ObservedTimestamps) + nodeID, wallNanos, txn.ObservedTimestamps) } - if pErr.OriginNode != desc.NodeID { + if pErr.OriginNode != nodeID { t.Fatalf("unexpected OriginNode %d, expected %d", - pErr.OriginNode, desc.NodeID) + pErr.OriginNode, nodeID) } }}, {goodKey, - func(wallNanos int64, pReply roachpb.Response, pErr *roachpb.Error) { + func(wallNanos int64, nodeID roachpb.NodeID, pReply roachpb.Response, pErr *roachpb.Error) { if pErr != nil { t.Fatal(pErr) } @@ -1173,7 +1167,7 @@ func TestStoreObservedTimestamp(t *testing.T) { if txn == nil || txn.ID == (uuid.UUID{}) { t.Fatal("expected transactional response") } - obs, _ := txn.GetObservedTimestamp(desc.NodeID) + obs, _ := txn.GetObservedTimestamp(nodeID) if act, exp := obs.WallTime, wallNanos; exp != act { t.Fatalf("unexpected observed wall time: %d, wanted %d", act, exp) } @@ -1196,14 +1190,11 @@ func TestStoreObservedTimestamp(t *testing.T) { store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) txn := newTransaction("test", test.key, 1, store.cfg.Clock) txn.MaxTimestamp = hlc.MaxTimestamp + h := roachpb.Header{Txn: txn} pArgs := putArgs(test.key, []byte("value")) - h := roachpb.Header{ - Txn: txn, - Replica: desc, - } assignSeqNumsForReqs(txn, &pArgs) pReply, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), h, &pArgs) - test.check(manual.UnixNano(), pReply, pErr) + test.check(manual.UnixNano(), store.NodeID(), pReply, pErr) }() } }