Skip to content

Commit

Permalink
storage: use header timestamp for PushTxnRequest
Browse files Browse the repository at this point in the history
This commit deprecates PushTxnRequest.Now and gives its responsibility
to the batch header timestamp. The biggest reason to do this is so that
PushTxn requests properly update their receiver's clock. This is critical
because a PushTxn request can result in a timestamp cache entry to be
created with a value up to this time, so for safety, we need to ensure
that the leaseholder updates its clock at least to this time _before_
evaluating the request. Otherwise, a lease transfer could miss the
request's effect on the timestamp cache and result in a lost push/abort.

The comment on PushTxnRequest.Now mentioned that the header timestamp
couldn't be used because the header's timestamp "does not necessarily
advance with the node clock across retries and hence cannot detect
abandoned transactions." This dates back all the way to #1102. I haven't
been able to piece together what kind of retries this is referring to,
but I'm almost positive that they don't still apply.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 2, 2019
1 parent 76228cd commit 8c81edd
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 499 deletions.
843 changes: 423 additions & 420 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -744,11 +744,13 @@ message PushTxnRequest {
// It is used to assist that field's migration.
// TODO(nvanbenschoten): Remove this field in 19.2.
bool inclusive_push_to = 9;
// Now holds the timestamp used to compare the last heartbeat of the pushee
// against. This is necessary since the request header's timestamp does not
// necessarily advance with the node clock across retries and hence cannot
// detect abandoned transactions.
util.hlc.Timestamp now = 5 [(gogoproto.nullable) = false];
// DeprecatedNow holds the timestamp used to compare the last heartbeat of the
// pushee against.
//
// The field remains for compatibility with 2.1 nodes. Users should set the
// same value for this field and the batch header timestamp.
// TODO(nvanbenschoten): Remove this field in 19.2.
util.hlc.Timestamp deprecated_now = 5 [(gogoproto.nullable) = false];
// Readers set this to PUSH_TIMESTAMP to move pushee_txn's provisional
// commit timestamp forward. Writers set this to PUSH_ABORT to request
// that pushee_txn be aborted if possible. Inconsistent readers set
Expand Down
19 changes: 13 additions & 6 deletions pkg/storage/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,13 @@ func PushTxn(
return result.Result{}, ErrTransactionUnsupported
}

if args.Now == (hlc.Timestamp{}) {
return result.Result{}, errors.Errorf("the field Now must be provided")
// Verify that the PushTxn's timestamp is not less than the timestamp that
// the request intends to push the transaction to. Transactions should not
// be pushed into the future or their effect may not be fully reflected in
// a future leaseholder's timestamp cache. This is analogous to how reads
// should not be performed at a timestamp in the future.
if h.Timestamp.Less(args.PushTo) {
return result.Result{}, errors.Errorf("PushTo %s larger than PushRequest header timestamp %s", args.PushTo, h.Timestamp)
}

if !bytes.Equal(args.Key, args.PusheeTxn.Key) {
Expand Down Expand Up @@ -200,7 +205,7 @@ func PushTxn(
var reason string

switch {
case txnwait.IsExpired(args.Now, &reply.PusheeTxn):
case txnwait.IsExpired(h.Timestamp, &reply.PusheeTxn):
reason = "pushee is expired"
// When cleaning up, actually clean up (as opposed to simply pushing
// the garbage in the path of future writers).
Expand Down Expand Up @@ -244,9 +249,11 @@ func PushTxn(
case roachpb.PUSH_ABORT:
// If aborting the transaction, set the new status.
reply.PusheeTxn.Status = roachpb.ABORTED
// Forward the timestamp to accommodate AbortSpan GC. See method
// comment for details.
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
// If the transaction record was already present, forward the timestamp
// to accommodate AbortSpan GC. See method comment for details.
if ok {
reply.PusheeTxn.Timestamp.Forward(reply.PusheeTxn.LastActive())
}
case roachpb.PUSH_TIMESTAMP:
// Otherwise, update timestamp to be one greater than the request's
// timestamp. This new timestamp will be use to update the read
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2536,12 +2536,12 @@ func TestDistributedTxnCleanup(t *testing.T) {
// normal or min priority txn.
if force {
ba := roachpb.BatchRequest{}
ba.Timestamp = store.Clock().Now()
ba.RangeID = lhs.RangeID
ba.Add(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: proto.Key,
},
Now: store.Clock().Now(),
PusheeTxn: proto.TxnMeta,
PushType: roachpb.PUSH_ABORT,
Force: true,
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/intentresolver/contention_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,20 @@ func (cq *contentionQueue) add(
log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn))
break Loop
}
pushReq := &roachpb.PushTxnRequest{

b := &client.Batch{}
b.Header.Timestamp = cq.clock.Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pusheeTxn.Key,
},
PusherTxn: getPusherTxn(h),
PusheeTxn: *pusheeTxn,
PushTo: h.Timestamp.Next(),
InclusivePushTo: true,
Now: cq.clock.Now(),
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
}
b := &client.Batch{}
b.AddRawRequest(pushReq)
})
log.VEventf(ctx, 3, "%s pushing %s to detect dependency cycles", txnID(curPusher.txn), pusheeTxn.ID.Short())
if err := cq.db.Run(ctx, b); err != nil {
log.VErrEventf(ctx, 2, "while waiting in push contention queue to push %s: %s", pusheeTxn.ID.Short(), b.MustPErr())
Expand Down
33 changes: 12 additions & 21 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,47 +422,37 @@ func (ir *IntentResolver) MaybePushTransactions(
return nil, nil
}

pusherTxn := getPusherTxn(h)
log.Eventf(ctx, "pushing %d transaction(s)", len(pushTxns))

// Attempt to push the transaction(s).
now := ir.clock.Now()
pusherTxn := getPusherTxn(h)
var pushReqs []roachpb.Request
b := &client.Batch{}
b.Header.Timestamp = ir.clock.Now()
for _, pushTxn := range pushTxns {
pushReqs = append(pushReqs, &roachpb.PushTxnRequest{
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: pushTxn.Key,
},
PusherTxn: pusherTxn,
PusheeTxn: pushTxn,
PushTo: h.Timestamp.Next(),
InclusivePushTo: true,
// The timestamp is used by PushTxn for figuring out whether the
// transaction is abandoned. If we used the argument's timestamp
// here, we would run into busy loops because that timestamp
// usually stays fixed among retries, so it will never realize
// that a transaction has timed out. See #877.
Now: now,
PushType: pushType,
DeprecatedNow: b.Header.Timestamp,
PushType: pushType,
})
}
b := &client.Batch{}
b.AddRawRequest(pushReqs...)
var pErr *roachpb.Error
if err := ir.db.Run(ctx, b); err != nil {
pErr = b.MustPErr()
}
err := ir.db.Run(ctx, b)
cleanupInFlightPushes()
if pErr != nil {
return nil, pErr
if err != nil {
return nil, b.MustPErr()
}

br := b.RawResponse()
pushedTxns := map[uuid.UUID]roachpb.Transaction{}
for _, resp := range br.Responses {
txn := resp.GetInner().(*roachpb.PushTxnResponse).PusheeTxn
if _, ok := pushedTxns[txn.ID]; ok {
log.Fatalf(ctx, "have two PushTxn responses for %s\nreqs: %+v", txn.ID, pushReqs)
log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID)
}
pushedTxns[txn.ID] = txn
log.Eventf(ctx, "%s is now %s", txn.ID, txn.Status)
Expand Down Expand Up @@ -700,13 +690,14 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
return
}
b := &client.Batch{}
b.Header.Timestamp = now
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority},
},
PusheeTxn: txn.TxnMeta,
Now: now,
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
InclusivePushTo: true,
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,30 +1372,32 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error {
// roachpb.PUSH_TOUCH, though it might appear more semantically correct,
// returns immediately and causes us to spin hot, whereas
// roachpb.PUSH_ABORT efficiently blocks until the transaction completes.
res, pErr := client.SendWrapped(ctx, r.DB().NonTransactionalSender(), &roachpb.PushTxnRequest{
b := &client.Batch{}
b.Header.Timestamp = r.Clock().Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: intent.Txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: roachpb.MinTxnPriority},
},
PusheeTxn: intent.Txn,
Now: r.Clock().Now(),
DeprecatedNow: b.Header.Timestamp,
PushType: roachpb.PUSH_ABORT,
InclusivePushTo: true,
})
if pErr != nil {
if err := r.DB().Run(ctx, b); err != nil {
select {
case <-r.store.stopper.ShouldQuiesce():
// The server is shutting down. The error while pushing the
// transaction was probably caused by the shutdown, so ignore it.
return
default:
log.Warningf(ctx, "error while watching for merge to complete: PushTxn: %s", pErr)
log.Warningf(ctx, "error while watching for merge to complete: PushTxn: %s", err)
// We can't safely unblock traffic until we can prove that the merge
// transaction is committed or aborted. Nothing to do but try again.
continue
}
}
pushTxnRes = res.(*roachpb.PushTxnResponse)
pushTxnRes = b.RawResponse().Responses[0].GetInner().(*roachpb.PushTxnResponse)
break
}

Expand Down
38 changes: 13 additions & 25 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,6 @@ func pushTxnArgs(
RequestHeader: roachpb.RequestHeader{
Key: pushee.Key,
},
Now: pusher.Timestamp,
PushTo: pusher.Timestamp.Next(),
PusherTxn: *pusher,
PusheeTxn: pushee.TxnMeta,
Expand Down Expand Up @@ -3227,7 +3226,6 @@ func TestSerializableDeadline(t *testing.T) {
pusher := newTransaction(
"test pusher", key, roachpb.MaxUserPriority, tc.Clock())
pushReq := pushTxnArgs(pusher, txn, roachpb.PUSH_TIMESTAMP)
pushReq.Now = tc.Clock().Now()
resp, pErr := tc.SendWrapped(&pushReq)
if pErr != nil {
t.Fatal(pErr)
Expand Down Expand Up @@ -3281,7 +3279,6 @@ func TestTxnRecordUnderTxnSpanGCThreshold(t *testing.T) {
// will be aborted before it even tries.
pushee := newTransaction("pushee", key, 1, tc.Clock())
pushReq := pushTxnArgs(pusher, pushee, roachpb.PUSH_ABORT)
pushReq.Now = tc.Clock().Now()
pushReq.Force = true
resp, pErr := tc.SendWrapped(&pushReq)
if pErr != nil {
Expand Down Expand Up @@ -4992,10 +4989,9 @@ func TestPushTxnHeartbeatTimeout(t *testing.T) {

// Now, attempt to push the transaction with Now set to the txn start time + offset.
args := pushTxnArgs(pusher, pushee, test.pushType)
args.Now = pushee.OrigTimestamp.Add(test.timeOffset, 0)
args.PushTo = args.Now
args.PushTo = pushee.OrigTimestamp.Add(test.timeOffset, 0)

reply, pErr := tc.SendWrapped(&args)
reply, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: args.PushTo}, &args)

if test.expSuccess != (pErr == nil) {
t.Fatalf("%d: expSuccess=%t; got pErr %s, args=%+v, reply=%+v", i,
Expand Down Expand Up @@ -5035,21 +5031,23 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) {
var rra roachpb.ResolveIntentRangeRequest

ctx := context.Background()
h := roachpb.Header{Txn: txn, Timestamp: tc.Clock().Now()}
// Should not be able to push or resolve in a transaction.
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{Stats: &ms, Header: roachpb.Header{Txn: txn}, Args: &pa}, &roachpb.PushTxnResponse{}); !testutils.IsError(err, batcheval.ErrTransactionUnsupported.Error()) {
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{Stats: &ms, Header: h, Args: &pa}, &roachpb.PushTxnResponse{}); !testutils.IsError(err, batcheval.ErrTransactionUnsupported.Error()) {
t.Fatalf("transactional PushTxn returned unexpected error: %v", err)
}
if _, err := batcheval.ResolveIntent(ctx, b, batcheval.CommandArgs{Stats: &ms, Header: roachpb.Header{Txn: txn}, Args: &ra}, &roachpb.ResolveIntentResponse{}); !testutils.IsError(err, batcheval.ErrTransactionUnsupported.Error()) {
if _, err := batcheval.ResolveIntent(ctx, b, batcheval.CommandArgs{Stats: &ms, Header: h, Args: &ra}, &roachpb.ResolveIntentResponse{}); !testutils.IsError(err, batcheval.ErrTransactionUnsupported.Error()) {
t.Fatalf("transactional ResolveIntent returned unexpected error: %v", err)
}
if _, err := batcheval.ResolveIntentRange(ctx, b, batcheval.CommandArgs{Stats: &ms, Header: roachpb.Header{Txn: txn}, Args: &rra}, &roachpb.ResolveIntentRangeResponse{}); !testutils.IsError(err, batcheval.ErrTransactionUnsupported.Error()) {
if _, err := batcheval.ResolveIntentRange(ctx, b, batcheval.CommandArgs{Stats: &ms, Header: h, Args: &rra}, &roachpb.ResolveIntentRangeResponse{}); !testutils.IsError(err, batcheval.ErrTransactionUnsupported.Error()) {
t.Fatalf("transactional ResolveIntentRange returned unexpected error: %v", err)
}

// Should not get a transaction back from PushTxn. It used to erroneously
// return args.PusherTxn.
h = roachpb.Header{Timestamp: tc.Clock().Now()}
var reply roachpb.PushTxnResponse
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: tc.repl, Stats: &ms, Args: &pa}, &reply); err != nil {
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: tc.repl, Stats: &ms, Header: h, Args: &pa}, &reply); err != nil {
t.Fatal(err)
} else if reply.Txn != nil {
t.Fatalf("expected nil response txn, but got %s", reply.Txn)
Expand Down Expand Up @@ -5121,7 +5119,7 @@ func TestPushTxnPriorities(t *testing.T) {
// Now, attempt to push the transaction with intent epoch set appropriately.
args := pushTxnArgs(pusher, pushee, test.pushType)

_, pErr := tc.SendWrapped(&args)
_, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: args.PushTo}, &args)

if test.expSuccess != (pErr == nil) {
t.Errorf("expected success on trial %d? %t; got err %s", i, test.expSuccess, pErr)
Expand Down Expand Up @@ -5162,9 +5160,9 @@ func TestPushTxnPushTimestamp(t *testing.T) {
// Now, push the transaction using a PUSH_TIMESTAMP push request.
args := pushTxnArgs(pusher, pushee, roachpb.PUSH_TIMESTAMP)

resp, pErr := tc.SendWrapped(&args)
resp, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: args.PushTo}, &args)
if pErr != nil {
t.Errorf("unexpected error on push: %s", pErr)
t.Fatalf("unexpected error on push: %s", pErr)
}
expTS := pusher.Timestamp
expTS.Logical++
Expand Down Expand Up @@ -5204,9 +5202,9 @@ func TestPushTxnPushTimestampAlreadyPushed(t *testing.T) {
// Now, push the transaction using a PUSH_TIMESTAMP push request.
args := pushTxnArgs(pusher, pushee, roachpb.PUSH_TIMESTAMP)

resp, pErr := tc.SendWrapped(&args)
resp, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: args.PushTo}, &args)
if pErr != nil {
t.Errorf("unexpected pError on push: %s", pErr)
t.Fatalf("unexpected pError on push: %s", pErr)
}
reply := resp.(*roachpb.PushTxnResponse)
if reply.PusheeTxn.Timestamp != pushee.Timestamp {
Expand Down Expand Up @@ -8247,7 +8245,6 @@ func TestNoopRequestsNotProposed(t *testing.T) {
Key: txn.TxnMeta.Key,
},
PusheeTxn: txn.TxnMeta,
Now: cfg.Clock.Now(),
PushType: roachpb.PUSH_ABORT,
Force: true,
}
Expand Down Expand Up @@ -10276,7 +10273,6 @@ func TestCreateTxnRecord(t *testing.T) {
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_TIMESTAMP)
pt.PushTo = now
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
Expand All @@ -10294,7 +10290,6 @@ func TestCreateTxnRecord(t *testing.T) {
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_TIMESTAMP)
pt.PushTo = now
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
Expand All @@ -10313,7 +10308,6 @@ func TestCreateTxnRecord(t *testing.T) {
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_TIMESTAMP)
pt.PushTo = now
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
Expand All @@ -10329,7 +10323,6 @@ func TestCreateTxnRecord(t *testing.T) {
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_TIMESTAMP)
pt.PushTo = now
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
Expand All @@ -10345,7 +10338,6 @@ func TestCreateTxnRecord(t *testing.T) {
name: "begin transaction after push abort",
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_ABORT)
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
Expand All @@ -10359,7 +10351,6 @@ func TestCreateTxnRecord(t *testing.T) {
name: "heartbeat transaction after push abort",
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_ABORT)
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
Expand All @@ -10373,7 +10364,6 @@ func TestCreateTxnRecord(t *testing.T) {
name: "heartbeat transaction after push abort and restart",
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_ABORT)
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
Expand All @@ -10393,7 +10383,6 @@ func TestCreateTxnRecord(t *testing.T) {
name: "end transaction (abort) after push abort",
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_ABORT)
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
Expand All @@ -10408,7 +10397,6 @@ func TestCreateTxnRecord(t *testing.T) {
name: "end transaction (commit) after push abort",
setup: func(txn *roachpb.Transaction, now hlc.Timestamp) error {
pt := pushTxnArgs(pusher, txn, roachpb.PUSH_ABORT)
pt.Now = now
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
run: func(txn *roachpb.Transaction, _ hlc.Timestamp) error {
Expand Down
Loading

0 comments on commit 8c81edd

Please sign in to comment.