diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index 9aae5223b26e..2c4d4bf86304 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -837,6 +837,82 @@ func TestRangeTransferLease(t *testing.T) { }) } +// TestRangeLimitTxnMaxTimestamp verifies that on lease transfer, the +// normal limiting of a txn's max timestamp to the first observed +// timestamp on a node is extended to include the lease start +// timestamp. This disallows the possibility that a write to another +// replica of the range (on node n1) happened at a later timestamp +// than the originally observed timestamp for the node which now owns +// the lease (n2). This can happen if the replication of the write +// doesn't make it from n1 to n2 before the transaction observes n2's +// clock time. +func TestRangeLimitTxnMaxTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + cfg := storage.TestStoreConfig(nil) + cfg.RangeLeaseRaftElectionTimeoutMultiplier = + float64((9 * time.Second) / cfg.RaftElectionTimeout()) + mtc := &multiTestContext{} + mtc.storeConfig = &cfg + keyA := roachpb.Key("a") + // Create a new clock for node2 to allow drift between the two wall clocks. + manual1 := hlc.NewManualClock(100) // node1 clock is @t=100 + clock1 := hlc.NewClock(manual1.UnixNano, 250*time.Nanosecond) + manual2 := hlc.NewManualClock(98) // node2 clock is @t=98 + clock2 := hlc.NewClock(manual2.UnixNano, 250*time.Nanosecond) + mtc.clocks = []*hlc.Clock{clock1, clock2} + + // Start a transaction using node2 as a gateway. + txn := roachpb.MakeTransaction("test", keyA, 1, enginepb.SERIALIZABLE, clock2.Now(), 250 /* maxOffsetNs */) + // Simulate a read to another range on node2 by setting the observed timestamp. + txn.UpdateObservedTimestamp(2, clock2.Now()) + + defer mtc.Stop() + mtc.Start(t, 2) + + // Do a write on node1 to establish a key with its timestamp @t=100. + if _, pErr := client.SendWrapped( + context.Background(), mtc.distSenders[0], putArgs(keyA, []byte("value")), + ); pErr != nil { + t.Fatal(pErr) + } + + // Up-replicate the data in the range to node2. + replica1 := mtc.stores[0].LookupReplica(roachpb.RKey(keyA), nil) + mtc.replicateRange(replica1.RangeID, 1) + + // Transfer the lease from node1 to node2. + replica2 := mtc.stores[1].LookupReplica(roachpb.RKey(keyA), nil) + replica2Desc, err := replica2.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + testutils.SucceedsSoon(t, func() error { + if err := replica1.AdminTransferLease(context.Background(), replica2Desc.StoreID); err != nil { + t.Fatal(err) + } + lease, _ := replica2.GetLease() + if lease.Replica.NodeID != replica2.NodeID() { + return errors.Errorf("expected lease transfer to node2: %s", lease) + } + return nil + }) + // Verify that after the lease transfer, node2's clock has advanced to at least 100. + if now1, now2 := clock1.Now(), clock2.Now(); now2.WallTime < now1.WallTime { + t.Fatalf("expected node2's clock walltime to be >= %d; got %d", now1.WallTime, now2.WallTime) + } + + // Send a get request for keyA to node2, which is now the + // leaseholder. If the max timestamp were not being properly limited, + // we would end up incorrectly reading nothing for keyA. Instead we + // expect to see an uncertainty interval error. + h := roachpb.Header{Txn: &txn} + if _, pErr := client.SendWrappedWith( + context.Background(), mtc.distSenders[0], h, getArgs(keyA), + ); !testutils.IsPError(pErr, "uncertainty") { + t.Fatalf("expected an uncertainty interval error; got %v", pErr) + } +} + // TestLeaseMetricsOnSplitAndTransfer verifies that lease-related metrics // are updated after splitting a range and then initiating one successful // and one failing lease transfer. diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 16f0517e1501..ae067f87437c 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -650,7 +650,7 @@ func (m *multiTestContext) populateDB(idx int, stopper *stop.Stopper) { ambient := log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer} m.distSenders[idx] = kv.NewDistSender(kv.DistSenderConfig{ AmbientCtx: ambient, - Clock: m.clock, + Clock: m.clocks[idx], RangeDescriptorDB: mtcRangeDescriptorDB{ multiTestContext: m, ds: &m.distSenders[idx], @@ -664,12 +664,12 @@ func (m *multiTestContext) populateDB(idx int, stopper *stop.Stopper) { ambient, m.storeConfig.Settings, m.distSenders[idx], - m.clock, + m.clocks[idx], false, stopper, kv.MakeTxnMetrics(metric.TestSampleInterval), ) - m.dbs[idx] = client.NewDB(tcsFactory, m.clock) + m.dbs[idx] = client.NewDB(tcsFactory, m.clocks[idx]) } func (m *multiTestContext) populateStorePool(idx int, nodeLiveness *storage.NodeLiveness) { @@ -678,7 +678,7 @@ func (m *multiTestContext) populateStorePool(idx int, nodeLiveness *storage.Node log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, m.storeConfig.Settings, m.gossips[idx], - m.clock, + m.clocks[idx], storage.MakeStorePoolNodeLivenessFunc(nodeLiveness), /* deterministic */ false, ) @@ -831,7 +831,7 @@ func (m *multiTestContext) addStore(idx int) { ch: make(chan struct{}), } m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, func(ctx context.Context) { - now := m.clock.Now() + now := m.clocks[idx].Now() if err := store.WriteLastUpTimestamp(ctx, now); err != nil { log.Warning(ctx, err) } @@ -924,7 +924,7 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) { m.transport.GetCircuitBreaker(m.idents[i].NodeID).Reset() m.mu.Unlock() cfg.NodeLiveness.StartHeartbeat(ctx, stopper, func(ctx context.Context) { - now := m.clock.Now() + now := m.clocks[i].Now() if err := store.WriteLastUpTimestamp(ctx, now); err != nil { log.Warning(ctx, err) } @@ -1122,7 +1122,7 @@ func (m *multiTestContext) unreplicateRangeNonFatal(rangeID roachpb.RangeID, des func (m *multiTestContext) readIntFromEngines(key roachpb.Key) []int64 { results := make([]int64, len(m.engines)) for i, eng := range m.engines { - val, _, err := engine.MVCCGet(context.Background(), eng, key, m.clock.Now(), true, nil) + val, _, err := engine.MVCCGet(context.Background(), eng, key, m.clocks[i].Now(), true, nil) if err != nil { log.VEventf(context.TODO(), 1, "engine %d: error reading from key %s: %s", i, key, err) } else if val == nil { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 3c541e06d964..d6ba506300ff 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2425,6 +2425,7 @@ func (r *Replica) executeAdminBatch( if pErr != nil { return nil, pErr } + // Note there is no need to limit transaction max timestamp on admin requests. var resp roachpb.Response switch tArgs := args.(type) { @@ -2487,6 +2488,61 @@ func (r *Replica) executeAdminBatch( return br, nil } +// limitTxnMaxTimestamp limits the batch transaction's max timestamp +// so that it respects any timestamp already observed on this node. +// This prevents unnecessary uncertainty interval restarts caused by +// reading a value written at a timestamp between txn.Timestamp and +// txn.MaxTimestamp. The replica lease's start time is also taken into +// consideration to ensure that a lease transfer does not result in +// the observed timestamp for this node being inapplicable to data +// previously written by the former leaseholder. To wit: +// +// 1. put(k on leaseholder n1), gateway chooses t=1.0 +// 2. begin; read(unrelated key on n2); gateway chooses t=0.98 +// 3. pick up observed timestamp for n2 of t=0.99 +// 4. n1 transfers lease for range with k to n2 @ t=1.1 +// 5. read(k) on leaseholder n2 at OrigTimestamp=0.98 should get +// ReadWithinUncertaintyInterval because of the write in step 1, so +// even though we observed n2's timestamp in step 3 we must expand +// the uncertainty interval to the lease's start time, which is +// guaranteed to be greater than any write which occurred under +// the previous leaseholder. +func (r *Replica) limitTxnMaxTimestamp( + ctx context.Context, ba *roachpb.BatchRequest, status LeaseStatus, +) { + if ba.Txn == nil { + return + } + // For calls that read data within a txn, we keep track of timestamps + // observed from the various participating nodes' HLC clocks. If we have + // a timestamp on file for this Node which is smaller than MaxTimestamp, + // we can lower MaxTimestamp accordingly. If MaxTimestamp drops below + // OrigTimestamp, we effectively can't see uncertainty restarts anymore. + obsTS, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID) + if !ok { + return + } + // If the lease is valid, we use the greater of the observed + // timestamp and the lease start time, up to the max timestamp. This + // ensures we avoid incorrect assumptions about when data was + // written, in absolute time on a different node, which held the + // lease before this replica acquired it. + if status.State == LeaseState_VALID { + obsTS.Forward(status.Lease.Start) + } + if obsTS.Less(ba.Txn.MaxTimestamp) { + // Copy-on-write to protect others we might be sharing the Txn with. + shallowTxn := *ba.Txn + // The uncertainty window is [OrigTimestamp, maxTS), so if that window + // is empty, there won't be any uncertainty restarts. + if !ba.Txn.OrigTimestamp.Less(obsTS) { + log.Event(ctx, "read has no clock uncertainty") + } + shallowTxn.MaxTimestamp.Backward(obsTS) + ba.Txn = &shallowTxn + } +} + // executeReadOnlyBatch updates the read timestamp cache and waits for any // overlapping writes currently processing through Raft ahead of us to // clear via the command queue. @@ -2494,11 +2550,13 @@ func (r *Replica) executeReadOnlyBatch( ctx context.Context, ba roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { // If the read is not inconsistent, the read requires the range lease. + var status LeaseStatus if ba.ReadConsistency.RequiresReadLease() { - if _, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { + if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { return nil, pErr } } + r.limitTxnMaxTimestamp(ctx, &ba, status) spans, err := collectSpans(*r.Desc(), &ba) if err != nil { @@ -2698,18 +2756,19 @@ func (r *Replica) tryExecuteWriteBatch( }() var lease roachpb.Lease + var status LeaseStatus // For lease commands, use the provided previous lease for verification. if ba.IsSingleSkipLeaseCheckRequest() { lease = ba.GetPrevLeaseForLeaseRequest() } else { // Other write commands require that this replica has the range // lease. - var status LeaseStatus if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { return nil, pErr, proposalNoRetry } lease = status.Lease } + r.limitTxnMaxTimestamp(ctx, &ba, status) // Examine the read and write timestamp caches for preceding // commands which require this command to move its timestamp diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ecd3940e475e..909b91b3e3ed 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2765,9 +2765,9 @@ func (s *Store) Send( // updating the top end of our uncertainty timestamp would lead to a // restart (at least in the absence of a prior observed timestamp from // this node, in which case the following is a no-op). - if now.Less(ba.Txn.MaxTimestamp) { + if _, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); !ok { shallowTxn := *ba.Txn - shallowTxn.MaxTimestamp.Backward(now) + shallowTxn.UpdateObservedTimestamp(ba.Replica.NodeID, now) ba.Txn = &shallowTxn } } diff --git a/pkg/storage/stores.go b/pkg/storage/stores.go index c180e0a4af1c..d45bae650a9e 100644 --- a/pkg/storage/stores.go +++ b/pkg/storage/stores.go @@ -189,28 +189,6 @@ func (ls *Stores) Send( return nil, roachpb.NewError(err) } - if ba.Txn != nil { - // For calls that read data within a txn, we keep track of timestamps - // observed from the various participating nodes' HLC clocks. If we have - // a timestamp on file for this Node which is smaller than MaxTimestamp, - // we can lower MaxTimestamp accordingly. If MaxTimestamp drops below - // OrigTimestamp, we effectively can't see uncertainty restarts any - // more. - // Note that it's not an issue if MaxTimestamp propagates back out to - // the client via a returned Transaction update - when updating a Txn - // from another, the larger MaxTimestamp wins. - if maxTS, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); ok && maxTS.Less(ba.Txn.MaxTimestamp) { - // Copy-on-write to protect others we might be sharing the Txn with. - shallowTxn := *ba.Txn - // The uncertainty window is [OrigTimestamp, maxTS), so if that window - // is empty, there won't be any uncertainty restarts. - if !ba.Txn.OrigTimestamp.Less(maxTS) { - log.Event(ctx, "read has no clock uncertainty") - } - shallowTxn.MaxTimestamp.Backward(maxTS) - ba.Txn = &shallowTxn - } - } br, pErr := store.Send(ctx, ba) if br != nil && br.Error != nil { panic(roachpb.ErrorUnexpectedlySet(store, br))