Skip to content

Commit

Permalink
Merge #35297
Browse files Browse the repository at this point in the history
35297: storage: ensure that PushTxn request's timestamp cache updates are safe r=nvanbenschoten a=nvanbenschoten

This PR's overall goal is to ensure that the timestamp cache updates performed by `PushTxn` requests are always safe. It does this in a series of steps:
1. it makes its `PushTo` argument inclusive, making it easier to use and easier to assert against (e.g. `req.PushTo <= req.Timestamp`).
2. it removes the `Now` argument and begins using the batch's `Timestamp` field instead. This field is used to update the receiving store's clock _before_ the request is evaluated and before the lease transfer check is performed.
3. it adds an assertion that all timestamp cache updates are safe given their local clock. This will also catch hypothesized bugs like #22315 (comment).

I'm planning on getting this in to 19.1.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Mar 2, 2019
2 parents eae3765 + 3164bf0 commit 50a1008
Show file tree
Hide file tree
Showing 15 changed files with 641 additions and 536 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-9</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-10</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
894 changes: 470 additions & 424 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,23 @@ message PushTxnRequest {
// up-to-date value of the transaction record, but will be set or
// merged as appropriate.
storage.engine.enginepb.TxnMeta pushee_txn = 3 [(gogoproto.nullable) = false];
// PushTo is the timestamp just after which PusheeTxn is attempted to be
// pushed. During conflict resolution, it should be set to the timestamp
// of the its conflicting write.
// PushTo is the timestamp which PusheeTxn should be pushed to. During
// conflict resolution, it should be set just after the timestamp of the
// conflicting read or write.
util.hlc.Timestamp push_to = 4 [(gogoproto.nullable) = false];
// Now holds the timestamp used to compare the last heartbeat of the pushee
// against. This is necessary since the request header's timestamp does not
// necessarily advance with the node clock across retries and hence cannot
// detect abandoned transactions.
util.hlc.Timestamp now = 5 [(gogoproto.nullable) = false];
// InclusivePushTo is sent by nodes to specify that their PushTo timestamp
// is the timestamp they want the transaction to be pushed to, instead of
// the timestamp before the one they want the transaction to be pushed to.
// It is used to assist that field's migration.
// TODO(nvanbenschoten): Remove this field in 19.2.
bool inclusive_push_to = 9;
// DeprecatedNow holds the timestamp used to compare the last heartbeat of the
// pushee against.
//
// The field remains for compatibility with 2.1 nodes. Users should set the
// same value for this field and the batch header timestamp.
// TODO(nvanbenschoten): Remove this field in 19.2.
util.hlc.Timestamp deprecated_now = 5 [(gogoproto.nullable) = false];
// Readers set this to PUSH_TIMESTAMP to move pushee_txn's provisional
// commit timestamp forward. Writers set this to PUSH_ABORT to request
// that pushee_txn be aborted if possible. Inconsistent readers set
Expand Down
7 changes: 7 additions & 0 deletions pkg/settings/cluster/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
VersionCreateStats
VersionDirectImport
VersionSideloadedStorageNoReplicaID // see versionsSingleton for details
VersionPushTxnToInclusive

// Add new versions here (step one of two).

Expand Down Expand Up @@ -415,6 +416,7 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 7},
},
{
// VersionDirectImport is https://github.com/cockroachdb/cockroach/pull/34751.
Key: VersionDirectImport,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 8},
},
Expand All @@ -436,6 +438,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionSideloadedStorageNoReplicaID,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 9},
},
{
// VersionPushTxnToInclusive is https://github.com/cockroachdb/cockroach/pull/35297.
Key: VersionPushTxnToInclusive,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 10},
},

// Add new versions here (step two of two).

Expand Down
24 changes: 16 additions & 8 deletions pkg/storage/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ func PushTxn(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Now == (hlc.Timestamp{}) {
return result.Result{}, errors.Errorf("the field Now must be provided")

// Verify that the PushTxn's timestamp is not less than the timestamp that
// the request intends to push the transaction to. Transactions should not
// be pushed into the future or their effect may not be fully reflected in
// a future leaseholder's timestamp cache. This is analogous to how reads
// should not be performed at a timestamp in the future.
if h.Timestamp.Less(args.PushTo) {
return result.Result{}, errors.Errorf("PushTo %s larger than PushRequest header timestamp %s", args.PushTo, h.Timestamp)
}

if !bytes.Equal(args.Key, args.PusheeTxn.Key) {
Expand Down Expand Up @@ -183,7 +189,7 @@ func PushTxn(

// If we're trying to move the timestamp forward, and it's already
// far enough forward, return success.
if args.PushType == roachpb.PUSH_TIMESTAMP && args.PushTo.Less(reply.PusheeTxn.Timestamp) {
if args.PushType == roachpb.PUSH_TIMESTAMP && !reply.PusheeTxn.Timestamp.Less(args.PushTo) {
// Trivial noop.
return result.Result{}, nil
}
Expand All @@ -199,7 +205,7 @@ func PushTxn(
var reason string

switch {
case txnwait.IsExpired(args.Now, &reply.PusheeTxn):
case txnwait.IsExpired(h.Timestamp, &reply.PusheeTxn):
reason = "pushee is expired"
// When cleaning up, actually clean up (as opposed to simply pushing
// the garbage in the path of future writers).
Expand Down Expand Up @@ -243,16 +249,18 @@ func PushTxn(
case roachpb.PUSH_ABORT:
// If aborting the transaction, set the new status.
reply.PusheeTxn.Status = roachpb.ABORTED
// Forward the timestamp to accommodate AbortSpan GC. See method
// comment for details.
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
// If the transaction record was already present, forward the timestamp
// to accommodate AbortSpan GC. See method comment for details.
if ok {
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
}
case roachpb.PUSH_TIMESTAMP:
// Otherwise, update timestamp to be one greater than the request's
// timestamp. This new timestamp will be use to update the read
// timestamp cache. If the transaction record was not already present
// then we rely on the read timestamp cache to prevent the record from
// ever being written with a timestamp beneath this timestamp.
reply.PusheeTxn.Timestamp = args.PushTo.Next()
reply.PusheeTxn.Timestamp.Forward(args.PushTo)
default:
return result.Result{}, errors.Errorf("unexpected push type: %v", args.PushType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2536,12 +2536,12 @@ func TestDistributedTxnCleanup(t *testing.T) {
// normal or min priority txn.
if force {
ba := roachpb.BatchRequest{}
ba.Timestamp = store.Clock().Now()
ba.RangeID = lhs.RangeID
ba.Add(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: proto.Key,
},
Now: store.Clock().Now(),
PusheeTxn: proto.TxnMeta,
PushType: roachpb.PUSH_ABORT,
Force: true,
Expand Down
20 changes: 11 additions & 9 deletions pkg/storage/intentresolver/contention_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,20 @@ func (cq *contentionQueue) add(
log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn))
break Loop
}
pushReq := &roachpb.PushTxnRequest{

b := &client.Batch{}
b.Header.Timestamp = cq.clock.Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pusheeTxn.Key,
},
PusherTxn: getPusherTxn(h),
PusheeTxn: *pusheeTxn,
PushTo: h.Timestamp,
Now: cq.clock.Now(),
PushType: roachpb.PUSH_ABORT,
}
b := &client.Batch{}
b.AddRawRequest(pushReq)
PusherTxn: getPusherTxn(h),
PusheeTxn: *pusheeTxn,
PushTo: h.Timestamp.Next(),
InclusivePushTo: true,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
})
log.VEventf(ctx, 3, "%s pushing %s to detect dependency cycles", txnID(curPusher.txn), pusheeTxn.ID.Short())
if err := cq.db.Run(ctx, b); err != nil {
log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), b.MustPErr())
Expand Down
45 changes: 19 additions & 26 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,46 +422,37 @@ func (ir *IntentResolver) MaybePushTransactions(
return nil, nil
}

pusherTxn := getPusherTxn(h)
log.Eventf(ctx, "pushing %d transaction(s)", len(pushTxns))

// Attempt to push the transaction(s).
now := ir.clock.Now()
pusherTxn := getPusherTxn(h)
var pushReqs []roachpb.Request
b := &client.Batch{}
b.Header.Timestamp = ir.clock.Now()
for _, pushTxn := range pushTxns {
pushReqs = append(pushReqs, &roachpb.PushTxnRequest{
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pushTxn.Key,
},
PusherTxn: pusherTxn,
PusheeTxn: pushTxn,
PushTo: h.Timestamp,
// The timestamp is used by PushTxn for figuring out whether the
// transaction is abandoned. If we used the argument's timestamp
// here, we would run into busy loops because that timestamp
// usually stays fixed among retries, so it will never realize
// that a transaction has timed out. See #877.
Now: now,
PushType: pushType,
PusherTxn: pusherTxn,
PusheeTxn: pushTxn,
PushTo: h.Timestamp.Next(),
InclusivePushTo: true,
DeprecatedNow: b.Header.Timestamp,
PushType: pushType,
})
}
b := &client.Batch{}
b.AddRawRequest(pushReqs...)
var pErr *roachpb.Error
if err := ir.db.Run(ctx, b); err != nil {
pErr = b.MustPErr()
}
err := ir.db.Run(ctx, b)
cleanupInFlightPushes()
if pErr != nil {
return nil, pErr
if err != nil {
return nil, b.MustPErr()
}

br := b.RawResponse()
pushedTxns := map[uuid.UUID]roachpb.Transaction{}
for _, resp := range br.Responses {
txn := resp.GetInner().(*roachpb.PushTxnResponse).PusheeTxn
if _, ok := pushedTxns[txn.ID]; ok {
log.Fatalf(ctx, "have two PushTxn responses for %s\nreqs: %+v", txn.ID, pushReqs)
log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID)
}
pushedTxns[txn.ID] = txn
log.Eventf(ctx, "%s is now %s", txn.ID, txn.Status)
Expand Down Expand Up @@ -699,14 +690,16 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
return
}
b := &client.Batch{}
b.Header.Timestamp = now
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority},
},
PusheeTxn: txn.TxnMeta,
Now: now,
PushType: roachpb.PUSH_ABORT,
PusheeTxn: txn.TxnMeta,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
InclusivePushTo: true,
})
pushed = true
if err := ir.db.Run(ctx, b); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,29 +1372,32 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error {
// roachpb.PUSH_TOUCH, though it might appear more semantically correct,
// returns immediately and causes us to spin hot, whereas
// roachpb.PUSH_ABORT efficiently blocks until the transaction completes.
res, pErr := client.SendWrapped(ctx, r.DB().NonTransactionalSender(), &roachpb.PushTxnRequest{
b := &client.Batch{}
b.Header.Timestamp = r.Clock().Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: intent.Txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MinTxnPriority},
},
PusheeTxn: intent.Txn,
Now: r.Clock().Now(),
PushType: roachpb.PUSH_ABORT,
PusheeTxn: intent.Txn,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
InclusivePushTo: true,
})
if pErr != nil {
if err := r.DB().Run(ctx, b); err != nil {
select {
case <-r.store.stopper.ShouldQuiesce():
// The server is shutting down. The error while pushing the
// transaction was probably caused by the shutdown, so ignore it.
return
default:
log.Warningf(ctx, "error while watching for merge to complete: PushTxn: %s", pErr)
log.Warningf(ctx, "error while watching for merge to complete: PushTxn: %s", err)
// We can't safely unblock traffic until we can prove that the merge
// transaction is committed or aborted. Nothing to do but try again.
continue
}
}
pushTxnRes = res.(*roachpb.PushTxnResponse)
pushTxnRes = b.RawResponse().Responses[0].GetInner().(*roachpb.PushTxnResponse)
break
}

Expand Down
Loading

0 comments on commit 50a1008

Please sign in to comment.