Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: ensure that PushTxn request's timestamp cache updates are safe #35297

Merged
merged 3 commits into from
Mar 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-9</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-10</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
894 changes: 470 additions & 424 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,23 @@ message PushTxnRequest {
// up-to-date value of the transaction record, but will be set or
// merged as appropriate.
storage.engine.enginepb.TxnMeta pushee_txn = 3 [(gogoproto.nullable) = false];
// PushTo is the timestamp just after which PusheeTxn is attempted to be
// pushed. During conflict resolution, it should be set to the timestamp
// of the its conflicting write.
// PushTo is the timestamp which PusheeTxn should be pushed to. During
// conflict resolution, it should be set just after the timestamp of the
// conflicting read or write.
util.hlc.Timestamp push_to = 4 [(gogoproto.nullable) = false];
// Now holds the timestamp used to compare the last heartbeat of the pushee
// against. This is necessary since the request header's timestamp does not
// necessarily advance with the node clock across retries and hence cannot
// detect abandoned transactions.
util.hlc.Timestamp now = 5 [(gogoproto.nullable) = false];
// InclusivePushTo is sent by nodes to specify that their PushTo timestamp
// is the timestamp they want the transaction to be pushed to, instead of
// the timestamp before the one they want the transaction to be pushed to.
// It is used to assist that field's migration.
// TODO(nvanbenschoten): Remove this field in 19.2.
bool inclusive_push_to = 9;
// DeprecatedNow holds the timestamp used to compare the last heartbeat of the
// pushee against.
//
// The field remains for compatibility with 2.1 nodes. Users should set the
// same value for this field and the batch header timestamp.
// TODO(nvanbenschoten): Remove this field in 19.2.
util.hlc.Timestamp deprecated_now = 5 [(gogoproto.nullable) = false];
// Readers set this to PUSH_TIMESTAMP to move pushee_txn's provisional
// commit timestamp forward. Writers set this to PUSH_ABORT to request
// that pushee_txn be aborted if possible. Inconsistent readers set
Expand Down
7 changes: 7 additions & 0 deletions pkg/settings/cluster/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
VersionCreateStats
VersionDirectImport
VersionSideloadedStorageNoReplicaID // see versionsSingleton for details
VersionPushTxnToInclusive

// Add new versions here (step one of two).

Expand Down Expand Up @@ -415,6 +416,7 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 7},
},
{
// VersionDirectImport is https://github.com/cockroachdb/cockroach/pull/34751.
Key: VersionDirectImport,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 8},
},
Expand All @@ -436,6 +438,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionSideloadedStorageNoReplicaID,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 9},
},
{
// VersionPushTxnToInclusive is https://github.com/cockroachdb/cockroach/pull/35297.
Key: VersionPushTxnToInclusive,
Version: roachpb.Version{Major: 2, Minor: 1, Unstable: 10},
},

// Add new versions here (step two of two).

Expand Down
24 changes: 16 additions & 8 deletions pkg/storage/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ func PushTxn(
if h.Txn != nil {
return result.Result{}, ErrTransactionUnsupported
}
if args.Now == (hlc.Timestamp{}) {
return result.Result{}, errors.Errorf("the field Now must be provided")

// Verify that the PushTxn's timestamp is not less than the timestamp that
// the request intends to push the transaction to. Transactions should not
// be pushed into the future or their effect may not be fully reflected in
// a future leaseholder's timestamp cache. This is analogous to how reads
// should not be performed at a timestamp in the future.
if h.Timestamp.Less(args.PushTo) {
return result.Result{}, errors.Errorf("PushTo %s larger than PushRequest header timestamp %s", args.PushTo, h.Timestamp)
}

if !bytes.Equal(args.Key, args.PusheeTxn.Key) {
Expand Down Expand Up @@ -183,7 +189,7 @@ func PushTxn(

// If we're trying to move the timestamp forward, and it's already
// far enough forward, return success.
if args.PushType == roachpb.PUSH_TIMESTAMP && args.PushTo.Less(reply.PusheeTxn.Timestamp) {
if args.PushType == roachpb.PUSH_TIMESTAMP && !reply.PusheeTxn.Timestamp.Less(args.PushTo) {
// Trivial noop.
return result.Result{}, nil
}
Expand All @@ -199,7 +205,7 @@ func PushTxn(
var reason string

switch {
case txnwait.IsExpired(args.Now, &reply.PusheeTxn):
case txnwait.IsExpired(h.Timestamp, &reply.PusheeTxn):
reason = "pushee is expired"
// When cleaning up, actually clean up (as opposed to simply pushing
// the garbage in the path of future writers).
Expand Down Expand Up @@ -243,16 +249,18 @@ func PushTxn(
case roachpb.PUSH_ABORT:
// If aborting the transaction, set the new status.
reply.PusheeTxn.Status = roachpb.ABORTED
// Forward the timestamp to accommodate AbortSpan GC. See method
// comment for details.
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
// If the transaction record was already present, forward the timestamp
// to accommodate AbortSpan GC. See method comment for details.
if ok {
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
}
case roachpb.PUSH_TIMESTAMP:
// Otherwise, update timestamp to be one greater than the request's
// timestamp. This new timestamp will be use to update the read
// timestamp cache. If the transaction record was not already present
// then we rely on the read timestamp cache to prevent the record from
// ever being written with a timestamp beneath this timestamp.
reply.PusheeTxn.Timestamp = args.PushTo.Next()
reply.PusheeTxn.Timestamp.Forward(args.PushTo)
default:
return result.Result{}, errors.Errorf("unexpected push type: %v", args.PushType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2536,12 +2536,12 @@ func TestDistributedTxnCleanup(t *testing.T) {
// normal or min priority txn.
if force {
ba := roachpb.BatchRequest{}
ba.Timestamp = store.Clock().Now()
ba.RangeID = lhs.RangeID
ba.Add(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: proto.Key,
},
Now: store.Clock().Now(),
PusheeTxn: proto.TxnMeta,
PushType: roachpb.PUSH_ABORT,
Force: true,
Expand Down
20 changes: 11 additions & 9 deletions pkg/storage/intentresolver/contention_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,20 @@ func (cq *contentionQueue) add(
log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn))
break Loop
}
pushReq := &roachpb.PushTxnRequest{

b := &client.Batch{}
b.Header.Timestamp = cq.clock.Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pusheeTxn.Key,
},
PusherTxn: getPusherTxn(h),
PusheeTxn: *pusheeTxn,
PushTo: h.Timestamp,
Now: cq.clock.Now(),
PushType: roachpb.PUSH_ABORT,
}
b := &client.Batch{}
b.AddRawRequest(pushReq)
PusherTxn: getPusherTxn(h),
PusheeTxn: *pusheeTxn,
PushTo: h.Timestamp.Next(),
InclusivePushTo: true,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
})
log.VEventf(ctx, 3, "%s pushing %s to detect dependency cycles", txnID(curPusher.txn), pusheeTxn.ID.Short())
if err := cq.db.Run(ctx, b); err != nil {
log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), b.MustPErr())
Expand Down
45 changes: 19 additions & 26 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,46 +422,37 @@ func (ir *IntentResolver) MaybePushTransactions(
return nil, nil
}

pusherTxn := getPusherTxn(h)
log.Eventf(ctx, "pushing %d transaction(s)", len(pushTxns))

// Attempt to push the transaction(s).
now := ir.clock.Now()
pusherTxn := getPusherTxn(h)
var pushReqs []roachpb.Request
b := &client.Batch{}
b.Header.Timestamp = ir.clock.Now()
for _, pushTxn := range pushTxns {
pushReqs = append(pushReqs, &roachpb.PushTxnRequest{
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pushTxn.Key,
},
PusherTxn: pusherTxn,
PusheeTxn: pushTxn,
PushTo: h.Timestamp,
// The timestamp is used by PushTxn for figuring out whether the
// transaction is abandoned. If we used the argument's timestamp
// here, we would run into busy loops because that timestamp
// usually stays fixed among retries, so it will never realize
// that a transaction has timed out. See #877.
Now: now,
PushType: pushType,
PusherTxn: pusherTxn,
PusheeTxn: pushTxn,
PushTo: h.Timestamp.Next(),
InclusivePushTo: true,
DeprecatedNow: b.Header.Timestamp,
PushType: pushType,
})
}
b := &client.Batch{}
b.AddRawRequest(pushReqs...)
var pErr *roachpb.Error
if err := ir.db.Run(ctx, b); err != nil {
pErr = b.MustPErr()
}
err := ir.db.Run(ctx, b)
cleanupInFlightPushes()
if pErr != nil {
return nil, pErr
if err != nil {
return nil, b.MustPErr()
}

br := b.RawResponse()
pushedTxns := map[uuid.UUID]roachpb.Transaction{}
for _, resp := range br.Responses {
txn := resp.GetInner().(*roachpb.PushTxnResponse).PusheeTxn
if _, ok := pushedTxns[txn.ID]; ok {
log.Fatalf(ctx, "have two PushTxn responses for %s\nreqs: %+v", txn.ID, pushReqs)
log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID)
}
pushedTxns[txn.ID] = txn
log.Eventf(ctx, "%s is now %s", txn.ID, txn.Status)
Expand Down Expand Up @@ -699,14 +690,16 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
return
}
b := &client.Batch{}
b.Header.Timestamp = now
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority},
},
PusheeTxn: txn.TxnMeta,
Now: now,
PushType: roachpb.PUSH_ABORT,
PusheeTxn: txn.TxnMeta,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
InclusivePushTo: true,
})
pushed = true
if err := ir.db.Run(ctx, b); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,29 +1372,32 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error {
// roachpb.PUSH_TOUCH, though it might appear more semantically correct,
// returns immediately and causes us to spin hot, whereas
// roachpb.PUSH_ABORT efficiently blocks until the transaction completes.
res, pErr := client.SendWrapped(ctx, r.DB().NonTransactionalSender(), &roachpb.PushTxnRequest{
b := &client.Batch{}
b.Header.Timestamp = r.Clock().Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: intent.Txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MinTxnPriority},
},
PusheeTxn: intent.Txn,
Now: r.Clock().Now(),
PushType: roachpb.PUSH_ABORT,
PusheeTxn: intent.Txn,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
InclusivePushTo: true,
})
if pErr != nil {
if err := r.DB().Run(ctx, b); err != nil {
select {
case <-r.store.stopper.ShouldQuiesce():
// The server is shutting down. The error while pushing the
// transaction was probably caused by the shutdown, so ignore it.
return
default:
log.Warningf(ctx, "error while watching for merge to complete: PushTxn: %s", pErr)
log.Warningf(ctx, "error while watching for merge to complete: PushTxn: %s", err)
// We can't safely unblock traffic until we can prove that the merge
// transaction is committed or aborted. Nothing to do but try again.
continue
}
}
pushTxnRes = res.(*roachpb.PushTxnResponse)
pushTxnRes = b.RawResponse().Responses[0].GetInner().(*roachpb.PushTxnResponse)
break
}

Expand Down
Loading