Skip to content

Commit

Permalink
make the InternalPushTxnRequest.Now field required
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed May 26, 2015
1 parent 8132eaf commit e9a9ca1
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 11 deletions.
1 change: 1 addition & 0 deletions kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
Key: txn.Key,
Txn: txn2,
},
Now: s.Clock.Now(),
PusheeTxn: *txn,
PushType: proto.ABORT_TXN,
}
Expand Down
9 changes: 4 additions & 5 deletions proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,10 @@ enum PushTxnType {
message InternalPushTxnRequest {
optional RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
optional Transaction pushee_txn = 2 [(gogoproto.nullable) = false];
// Now optionally holds the timestamp used to compare the last heartbeat
// of the pushee against. In its absence, the request header's timestamp
// is used, but in some situations that timestamp will not advance with
// the node clock across retries and hence cannot detect abandoned
// transactions.
// Now holds the timestamp used to compare the last heartbeat of the pushee
// against. This is necessary since the request header's timestamp does not
// necessarily advance with the node clock across retries and hence cannot
// detect abandoned transactions.
optional Timestamp now = 3 [(gogoproto.nullable) = false];
// Readers set this to PUSH_TIMESTAMP to move PusheeTxn's commit
// timestamp forward. Writers set this to ABORT_TXN to request that
Expand Down
1 change: 1 addition & 0 deletions storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (gcq *gcQueue) resolveIntent(rng *Range, key proto.Key, meta *proto.MVCCMet
UserPriority: gogoproto.Int32(proto.MaxPriority),
Txn: nil,
},
Now: now,
PusheeTxn: *meta.Txn,
PushType: proto.ABORT_TXN,
}
Expand Down
8 changes: 6 additions & 2 deletions storage/range_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,13 @@ func (r *Range) InternalPushTxn(batch engine.Engine, ms *proto.MVCCStats, args *
if reply.PusheeTxn.LastHeartbeat == nil {
reply.PusheeTxn.LastHeartbeat = &reply.PusheeTxn.Timestamp
}
if args.Now.Equal(proto.ZeroTimestamp) {
reply.SetGoError(util.Error("the field Now must be provided"))
return
}
// Compute heartbeat expiration (all replicas must see the same result).
expiry := args.Now // caller can set this to his wall time.
expiry.Forward(args.Timestamp) // if Now is not set, fallback
expiry := args.Now
expiry.Forward(args.Timestamp) // if Timestamp is ahead, use that
expiry.WallTime -= 2 * DefaultHeartbeatInterval.Nanoseconds()

if reply.PusheeTxn.LastHeartbeat.Less(expiry) {
Expand Down
10 changes: 6 additions & 4 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ func pushTxnArgs(pusher, pushee *proto.Transaction, pushType proto.PushTxnType,
Replica: proto.Replica{StoreID: storeID},
Txn: pusher,
},
Now: pusher.Timestamp,
PusheeTxn: *pushee,
PushType: pushType,
}
Expand Down Expand Up @@ -1766,9 +1767,9 @@ func TestInternalPushTxnHeartbeatTimeout(t *testing.T) {
pushType proto.PushTxnType
expSuccess bool
}{
{nil, 0, proto.PUSH_TIMESTAMP, false},
{nil, 0, proto.ABORT_TXN, false},
{nil, 0, proto.CLEANUP_TXN, false},
{nil, 1, proto.PUSH_TIMESTAMP, false}, // using 0 is awkward
{nil, 1, proto.ABORT_TXN, false},
{nil, 1, proto.CLEANUP_TXN, false},
{nil, ns, proto.PUSH_TIMESTAMP, false},
{nil, ns, proto.ABORT_TXN, false},
{nil, ns, proto.CLEANUP_TXN, false},
Expand Down Expand Up @@ -1807,7 +1808,8 @@ func TestInternalPushTxnHeartbeatTimeout(t *testing.T) {
// Now, attempt to push the transaction with clock set to "currentTime".
tc.manualClock.Set(test.currentTime)
args, reply := pushTxnArgs(pusher, pushee, test.pushType, 1, tc.store.StoreID())
args.Timestamp = tc.clock.Now()
args.Timestamp = proto.Timestamp{WallTime: test.currentTime} // avoid logical ticks
args.Now = args.Timestamp
args.Timestamp.Logical = 0

err := tc.rng.AddCmd(tc.rng.context(), client.Call{Args: args, Reply: reply}, true)
Expand Down

0 comments on commit e9a9ca1

Please sign in to comment.