Skip to content

Commit

Permalink
storage: assert that timestamp cache updates are safe
Browse files Browse the repository at this point in the history
This new assertion would fire if bug like the one hypothesized in
the following comment exist:
#22315 (comment)

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 2, 2019
1 parent 8c81edd commit 3164bf0
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
11 changes: 9 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (tc *testContext) Sender() client.Sender {
tc.Fatal(err)
}
}
tc.Clock().Update(ba.Timestamp)
return ba
})
}
Expand Down Expand Up @@ -4841,7 +4842,10 @@ func TestPushTxnUpgradeExistingTxn(t *testing.T) {
pushee.Timestamp = test.ts
args := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT)

resp, pErr := tc.SendWrapped(&args)
// Set header timestamp to the maximum of the pusher and pushee timestamps.
h := roachpb.Header{Timestamp: args.PushTo}
h.Timestamp.Forward(pushee.Timestamp)
resp, pErr := tc.SendWrappedWith(h, &args)
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -5119,7 +5123,10 @@ func TestPushTxnPriorities(t *testing.T) {
// Now, attempt to push the transaction with intent epoch set appropriately.
args := pushTxnArgs(pusher, pushee, test.pushType)

_, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: args.PushTo}, &args)
// Set header timestamp to the maximum of the pusher and pushee timestamps.
h := roachpb.Header{Timestamp: args.PushTo}
h.Timestamp.Forward(pushee.Timestamp)
_, pErr := tc.SendWrappedWith(h, &args)

if test.expSuccess != (pErr == nil) {
t.Errorf("expected success on trial %d? %t; got err %s", i, test.expSuccess, pErr)
Expand Down
43 changes: 34 additions & 9 deletions pkg/storage/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package storage

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/tscache"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
Expand All @@ -29,7 +32,10 @@ import (
func (r *Replica) updateTimestampCache(
ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
) {
tc := r.store.tsCache
addToTSCache := r.store.tsCache.Add
if util.RaceEnabled {
addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr)
}
// Update the timestamp cache using the timestamp at which the batch
// was executed. Note this may have moved forward from ba.Timestamp,
// as when the request is retried locally on WriteTooOldErrors.
Expand Down Expand Up @@ -66,7 +72,7 @@ func (r *Replica) updateTimestampCache(
// to or greater than the transaction's OrigTimestamp,
// which is consulted in CanCreateTxnRecord.
key := keys.TransactionKey(start, txnID)
tc.Add(key, nil, ts, txnID, false /* readCache */)
addToTSCache(key, nil, ts, txnID, false /* readCache */)
case *roachpb.PushTxnRequest:
// A successful PushTxn request bumps the timestamp cache for
// the pushee's transaction key. The pushee will consult the
Expand All @@ -92,23 +98,23 @@ func (r *Replica) updateTimestampCache(
continue
}
key := keys.TransactionKey(start, pushee.ID)
tc.Add(key, nil, pushee.Timestamp, t.PusherTxn.ID, readCache)
addToTSCache(key, nil, pushee.Timestamp, t.PusherTxn.ID, readCache)
case *roachpb.ConditionalPutRequest:
if pErr != nil {
// ConditionalPut still updates on ConditionFailedErrors.
if _, ok := pErr.GetDetail().(*roachpb.ConditionFailedError); !ok {
continue
}
}
tc.Add(start, end, ts, txnID, true /* readCache */)
addToTSCache(start, end, ts, txnID, true /* readCache */)
case *roachpb.InitPutRequest:
if pErr != nil {
// InitPut still updates on ConditionFailedErrors.
if _, ok := pErr.GetDetail().(*roachpb.ConditionFailedError); !ok {
continue
}
}
tc.Add(start, end, ts, txnID, true /* readCache */)
addToTSCache(start, end, ts, txnID, true /* readCache */)
case *roachpb.ScanRequest:
resp := br.Responses[i].GetInner().(*roachpb.ScanResponse)
if resp.ResumeSpan != nil {
Expand All @@ -117,7 +123,7 @@ func (r *Replica) updateTimestampCache(
// end key for the span to update the timestamp cache.
end = resp.ResumeSpan.Key
}
tc.Add(start, end, ts, txnID, true /* readCache */)
addToTSCache(start, end, ts, txnID, true /* readCache */)
case *roachpb.ReverseScanRequest:
resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse)
if resp.ResumeSpan != nil {
Expand All @@ -127,7 +133,7 @@ func (r *Replica) updateTimestampCache(
// the span to update the timestamp cache.
start = resp.ResumeSpan.EndKey
}
tc.Add(start, end, ts, txnID, true /* readCache */)
addToTSCache(start, end, ts, txnID, true /* readCache */)
case *roachpb.QueryIntentRequest:
if t.IfMissing == roachpb.QueryIntentRequest_PREVENT {
resp := br.Responses[i].GetInner().(*roachpb.QueryIntentResponse)
Expand All @@ -140,16 +146,35 @@ func (r *Replica) updateTimestampCache(
// transaction ID so that we block the intent regardless
// of whether it is part of the current batch's transaction
// or not.
tc.Add(start, end, t.Txn.Timestamp, uuid.UUID{}, true /* readCache */)
addToTSCache(start, end, t.Txn.Timestamp, uuid.UUID{}, true /* readCache */)
}
}
default:
tc.Add(start, end, ts, txnID, !roachpb.UpdatesWriteTimestampCache(args))
addToTSCache(start, end, ts, txnID, !roachpb.UpdatesWriteTimestampCache(args))
}
}
}
}

// checkedTSCacheUpdate wraps tscache.Cache and asserts that any update to the
// cache is at or below the specified time.
func checkedTSCacheUpdate(
now hlc.Timestamp,
tc tscache.Cache,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) func(roachpb.Key, roachpb.Key, hlc.Timestamp, uuid.UUID, bool) {
return func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID, readCache bool) {
if now.Less(ts) {
panic(fmt.Sprintf("Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+
"The timestamp cache update could be lost on a lease transfer.", ts, ba, br, pErr, now))
}
tc.Add(start, end, ts, txnID, readCache)
}
}

// applyTimestampCache moves the batch timestamp forward depending on
// the presence of overlapping entries in the timestamp cache. If the
// batch is transactional, the txn timestamp and the txn.WriteTooOld
Expand Down

0 comments on commit 3164bf0

Please sign in to comment.