Skip to content

Commit

Permalink
storage: improve use of observed tstamps to limit uncertainty restarts
Browse files Browse the repository at this point in the history
This change relocates the logic for using observed timestamps to affect
a transaction's `MaxTimestamp` field in order to avoid uncertainty
restarts. This was previously done in `storage.Stores.Send`, but is now
moved to `storage.Replica.executeReadOnlyBatch` and
`storage.Replica.tryExecuteWriteBatch` in order to use information from
the replica's lease to avoiding failing to read a value written in
absolute time before the txn started, but at a higher timestamp than
the timestamp observed at the node which now owns the replica lease.

Fixes #23749

Release note: None
  • Loading branch information
spencerkimball committed Mar 22, 2018
1 parent 526bab2 commit 85d4627
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 33 deletions.
76 changes: 76 additions & 0 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,82 @@ func TestRangeTransferLease(t *testing.T) {
})
}

// TestRangeLimitTxnMaxTimestamp verifies that on lease transfer, the
// normal limiting of a txn's max timestamp to the first observed
// timestamp on a node is extended to include the lease start
// timestamp. This disallows the possibility that a write to another
// replica of the range (on node n1) happened at a later timestamp
// than the originally observed timestamp for the node which now owns
// the lease (n2). This can happen if the replication of the write
// doesn't make it from n1 to n2 before the transaction observes n2's
// clock time.
func TestRangeLimitTxnMaxTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
cfg := storage.TestStoreConfig(nil)
cfg.RangeLeaseRaftElectionTimeoutMultiplier =
float64((9 * time.Second) / cfg.RaftElectionTimeout())
mtc := &multiTestContext{}
mtc.storeConfig = &cfg
keyA := roachpb.Key("a")
// Create a new clock for node2 to allow drift between the two wall clocks.
manual1 := hlc.NewManualClock(100) // node1 clock is @t=100
clock1 := hlc.NewClock(manual1.UnixNano, 250*time.Nanosecond)
manual2 := hlc.NewManualClock(98) // node2 clock is @t=98
clock2 := hlc.NewClock(manual2.UnixNano, 250*time.Nanosecond)
mtc.clocks = []*hlc.Clock{clock1, clock2}

// Start a transaction using node2 as a gateway.
txn := roachpb.MakeTransaction("test", keyA, 1, enginepb.SERIALIZABLE, clock2.Now(), 250 /* maxOffsetNs */)
// Simulate a read to another range on node2 by setting the observed timestamp.
txn.UpdateObservedTimestamp(2, clock2.Now())

defer mtc.Stop()
mtc.Start(t, 2)

// Do a write on node1 to establish a key with its timestamp @t=100.
if _, pErr := client.SendWrapped(
context.Background(), mtc.distSenders[0], putArgs(keyA, []byte("value")),
); pErr != nil {
t.Fatal(pErr)
}

// Up-replicate the data in the range to node2.
replica1 := mtc.stores[0].LookupReplica(roachpb.RKey(keyA), nil)
mtc.replicateRange(replica1.RangeID, 1)

// Transfer the lease from node1 to node2.
replica2 := mtc.stores[1].LookupReplica(roachpb.RKey(keyA), nil)
replica2Desc, err := replica2.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
testutils.SucceedsSoon(t, func() error {
if err := replica1.AdminTransferLease(context.Background(), replica2Desc.StoreID); err != nil {
t.Fatal(err)
}
lease, _ := replica2.GetLease()
if lease.Replica.NodeID != replica2.NodeID() {
return errors.Errorf("expected lease transfer to node2: %s", lease)
}
return nil
})
// Verify that after the lease transfer, node2's clock has advanced to at least 100.
if now1, now2 := clock1.Now(), clock2.Now(); now2.WallTime < now1.WallTime {
t.Fatalf("expected node2's clock walltime to be >= %d; got %d", now1.WallTime, now2.WallTime)
}

// Send a get request for keyA to node2, which is now the
// leaseholder. If the max timestamp were not being properly limited,
// we would end up incorrectly reading nothing for keyA. Instead we
// expect to see an uncertainty interval error.
h := roachpb.Header{Txn: &txn}
if _, pErr := client.SendWrappedWith(
context.Background(), mtc.distSenders[0], h, getArgs(keyA),
); !testutils.IsPError(pErr, "uncertainty") {
t.Fatalf("expected an uncertainty interval error; got %v", pErr)
}
}

// TestLeaseMetricsOnSplitAndTransfer verifies that lease-related metrics
// are updated after splitting a range and then initiating one successful
// and one failing lease transfer.
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (m *multiTestContext) populateDB(idx int, stopper *stop.Stopper) {
ambient := log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}
m.distSenders[idx] = kv.NewDistSender(kv.DistSenderConfig{
AmbientCtx: ambient,
Clock: m.clock,
Clock: m.clocks[idx],
RangeDescriptorDB: mtcRangeDescriptorDB{
multiTestContext: m,
ds: &m.distSenders[idx],
Expand All @@ -664,12 +664,12 @@ func (m *multiTestContext) populateDB(idx int, stopper *stop.Stopper) {
ambient,
m.storeConfig.Settings,
m.distSenders[idx],
m.clock,
m.clocks[idx],
false,
stopper,
kv.MakeTxnMetrics(metric.TestSampleInterval),
)
m.dbs[idx] = client.NewDB(tcsFactory, m.clock)
m.dbs[idx] = client.NewDB(tcsFactory, m.clocks[idx])
}

func (m *multiTestContext) populateStorePool(idx int, nodeLiveness *storage.NodeLiveness) {
Expand All @@ -678,7 +678,7 @@ func (m *multiTestContext) populateStorePool(idx int, nodeLiveness *storage.Node
log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer},
m.storeConfig.Settings,
m.gossips[idx],
m.clock,
m.clocks[idx],
storage.MakeStorePoolNodeLivenessFunc(nodeLiveness),
/* deterministic */ false,
)
Expand Down Expand Up @@ -831,7 +831,7 @@ func (m *multiTestContext) addStore(idx int) {
ch: make(chan struct{}),
}
m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, func(ctx context.Context) {
now := m.clock.Now()
now := m.clocks[idx].Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warning(ctx, err)
}
Expand Down Expand Up @@ -924,7 +924,7 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) {
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Reset()
m.mu.Unlock()
cfg.NodeLiveness.StartHeartbeat(ctx, stopper, func(ctx context.Context) {
now := m.clock.Now()
now := m.clocks[i].Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warning(ctx, err)
}
Expand Down Expand Up @@ -1122,7 +1122,7 @@ func (m *multiTestContext) unreplicateRangeNonFatal(rangeID roachpb.RangeID, des
func (m *multiTestContext) readIntFromEngines(key roachpb.Key) []int64 {
results := make([]int64, len(m.engines))
for i, eng := range m.engines {
val, _, err := engine.MVCCGet(context.Background(), eng, key, m.clock.Now(), true, nil)
val, _, err := engine.MVCCGet(context.Background(), eng, key, m.clocks[i].Now(), true, nil)
if err != nil {
log.VEventf(context.TODO(), 1, "engine %d: error reading from key %s: %s", i, key, err)
} else if val == nil {
Expand Down
63 changes: 61 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,7 @@ func (r *Replica) executeAdminBatch(
if pErr != nil {
return nil, pErr
}
// Note there is no need to limit transaction max timestamp on admin requests.

var resp roachpb.Response
switch tArgs := args.(type) {
Expand Down Expand Up @@ -2487,18 +2488,75 @@ func (r *Replica) executeAdminBatch(
return br, nil
}

// limitTxnMaxTimestamp limits the batch transaction's max timestamp
// so that it respects any timestamp already observed on this node.
// This prevents unnecessary uncertainty interval restarts caused by
// reading a value written at a timestamp between txn.Timestamp and
// txn.MaxTimestamp. The replica lease's start time is also taken into
// consideration to ensure that a lease transfer does not result in
// the observed timestamp for this node being inapplicable to data
// previously written by the former leaseholder. To wit:
//
// 1. put(k on leaseholder n1), gateway chooses t=1.0
// 2. begin; read(unrelated key on n2); gateway chooses t=0.98
// 3. pick up observed timestamp for n2 of t=0.99
// 4. n1 transfers lease for range with k to n2 @ t=1.1
// 5. read(k) on leaseholder n2 at OrigTimestamp=0.98 should get
// ReadWithinUncertaintyInterval because of the write in step 1, so
// even though we observed n2's timestamp in step 3 we must expand
// the uncertainty interval to the lease's start time, which is
// guaranteed to be greater than any write which occurred under
// the previous leaseholder.
func (r *Replica) limitTxnMaxTimestamp(
ctx context.Context, ba *roachpb.BatchRequest, status LeaseStatus,
) {
if ba.Txn == nil {
return
}
// For calls that read data within a txn, we keep track of timestamps
// observed from the various participating nodes' HLC clocks. If we have
// a timestamp on file for this Node which is smaller than MaxTimestamp,
// we can lower MaxTimestamp accordingly. If MaxTimestamp drops below
// OrigTimestamp, we effectively can't see uncertainty restarts anymore.
obsTS, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID)
if !ok {
return
}
// If the lease is valid, we use the greater of the observed
// timestamp and the lease start time, up to the max timestamp. This
// ensures we avoid incorrect assumptions about when data was
// written, in absolute time on a different node, which held the
// lease before this replica acquired it.
if status.State == LeaseState_VALID {
obsTS.Forward(status.Lease.Start)
}
if obsTS.Less(ba.Txn.MaxTimestamp) {
// Copy-on-write to protect others we might be sharing the Txn with.
shallowTxn := *ba.Txn
// The uncertainty window is [OrigTimestamp, maxTS), so if that window
// is empty, there won't be any uncertainty restarts.
if !ba.Txn.OrigTimestamp.Less(obsTS) {
log.Event(ctx, "read has no clock uncertainty")
}
shallowTxn.MaxTimestamp.Backward(obsTS)
ba.Txn = &shallowTxn
}
}

// executeReadOnlyBatch updates the read timestamp cache and waits for any
// overlapping writes currently processing through Raft ahead of us to
// clear via the command queue.
func (r *Replica) executeReadOnlyBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// If the read is not inconsistent, the read requires the range lease.
var status LeaseStatus
if ba.ReadConsistency.RequiresReadLease() {
if _, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
return nil, pErr
}
}
r.limitTxnMaxTimestamp(ctx, &ba, status)

spans, err := collectSpans(*r.Desc(), &ba)
if err != nil {
Expand Down Expand Up @@ -2698,18 +2756,19 @@ func (r *Replica) tryExecuteWriteBatch(
}()

var lease roachpb.Lease
var status LeaseStatus
// For lease commands, use the provided previous lease for verification.
if ba.IsSingleSkipLeaseCheckRequest() {
lease = ba.GetPrevLeaseForLeaseRequest()
} else {
// Other write commands require that this replica has the range
// lease.
var status LeaseStatus
if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
return nil, pErr, proposalNoRetry
}
lease = status.Lease
}
r.limitTxnMaxTimestamp(ctx, &ba, status)

// Examine the read and write timestamp caches for preceding
// commands which require this command to move its timestamp
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2765,9 +2765,9 @@ func (s *Store) Send(
// updating the top end of our uncertainty timestamp would lead to a
// restart (at least in the absence of a prior observed timestamp from
// this node, in which case the following is a no-op).
if now.Less(ba.Txn.MaxTimestamp) {
if _, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); !ok {
shallowTxn := *ba.Txn
shallowTxn.MaxTimestamp.Backward(now)
shallowTxn.UpdateObservedTimestamp(ba.Replica.NodeID, now)
ba.Txn = &shallowTxn
}
}
Expand Down
22 changes: 0 additions & 22 deletions pkg/storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,6 @@ func (ls *Stores) Send(
return nil, roachpb.NewError(err)
}

if ba.Txn != nil {
// For calls that read data within a txn, we keep track of timestamps
// observed from the various participating nodes' HLC clocks. If we have
// a timestamp on file for this Node which is smaller than MaxTimestamp,
// we can lower MaxTimestamp accordingly. If MaxTimestamp drops below
// OrigTimestamp, we effectively can't see uncertainty restarts any
// more.
// Note that it's not an issue if MaxTimestamp propagates back out to
// the client via a returned Transaction update - when updating a Txn
// from another, the larger MaxTimestamp wins.
if maxTS, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); ok && maxTS.Less(ba.Txn.MaxTimestamp) {
// Copy-on-write to protect others we might be sharing the Txn with.
shallowTxn := *ba.Txn
// The uncertainty window is [OrigTimestamp, maxTS), so if that window
// is empty, there won't be any uncertainty restarts.
if !ba.Txn.OrigTimestamp.Less(maxTS) {
log.Event(ctx, "read has no clock uncertainty")
}
shallowTxn.MaxTimestamp.Backward(maxTS)
ba.Txn = &shallowTxn
}
}
br, pErr := store.Send(ctx, ba)
if br != nil && br.Error != nil {
panic(roachpb.ErrorUnexpectedlySet(store, br))
Expand Down

0 comments on commit 85d4627

Please sign in to comment.