From e9a9ca182ad5b813ea402c6b234f87249d94f084 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 26 May 2015 15:26:53 -0400 Subject: [PATCH] make the InternalPushTxnRequest.Now field required --- kv/txn_coord_sender_test.go | 1 + proto/internal.proto | 9 ++++----- storage/gc_queue.go | 1 + storage/range_command.go | 8 ++++++-- storage/range_test.go | 10 ++++++---- 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/kv/txn_coord_sender_test.go b/kv/txn_coord_sender_test.go index c6c33c6e2890..c56c6a888eb0 100644 --- a/kv/txn_coord_sender_test.go +++ b/kv/txn_coord_sender_test.go @@ -396,6 +396,7 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) { Key: txn.Key, Txn: txn2, }, + Now: s.Clock.Now(), PusheeTxn: *txn, PushType: proto.ABORT_TXN, } diff --git a/proto/internal.proto b/proto/internal.proto index 5851532c8ca6..a599909377b1 100644 --- a/proto/internal.proto +++ b/proto/internal.proto @@ -121,11 +121,10 @@ enum PushTxnType { message InternalPushTxnRequest { optional RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; optional Transaction pushee_txn = 2 [(gogoproto.nullable) = false]; - // Now optionally holds the timestamp used to compare the last heartbeat - // of the pushee against. In its absence, the request header's timestamp - // is used, but in some situations that timestamp will not advance with - // the node clock across retries and hence cannot detect abandoned - // transactions. + // Now holds the timestamp used to compare the last heartbeat of the pushee + // against. This is necessary since the request header's timestamp does not + // necessarily advance with the node clock across retries and hence cannot + // detect abandoned transactions. optional Timestamp now = 3 [(gogoproto.nullable) = false]; // Readers set this to PUSH_TIMESTAMP to move PusheeTxn's commit // timestamp forward. Writers set this to ABORT_TXN to request that diff --git a/storage/gc_queue.go b/storage/gc_queue.go index 21a21b2807f6..426de290a941 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -267,6 +267,7 @@ func (gcq *gcQueue) resolveIntent(rng *Range, key proto.Key, meta *proto.MVCCMet UserPriority: gogoproto.Int32(proto.MaxPriority), Txn: nil, }, + Now: now, PusheeTxn: *meta.Txn, PushType: proto.ABORT_TXN, } diff --git a/storage/range_command.go b/storage/range_command.go index f59512e810f9..bbcb71503e0b 100644 --- a/storage/range_command.go +++ b/storage/range_command.go @@ -568,9 +568,13 @@ func (r *Range) InternalPushTxn(batch engine.Engine, ms *proto.MVCCStats, args * if reply.PusheeTxn.LastHeartbeat == nil { reply.PusheeTxn.LastHeartbeat = &reply.PusheeTxn.Timestamp } + if args.Now.Equal(proto.ZeroTimestamp) { + reply.SetGoError(util.Error("the field Now must be provided")) + return + } // Compute heartbeat expiration (all replicas must see the same result). - expiry := args.Now // caller can set this to his wall time. - expiry.Forward(args.Timestamp) // if Now is not set, fallback + expiry := args.Now + expiry.Forward(args.Timestamp) // if Timestamp is ahead, use that expiry.WallTime -= 2 * DefaultHeartbeatInterval.Nanoseconds() if reply.PusheeTxn.LastHeartbeat.Less(expiry) { diff --git a/storage/range_test.go b/storage/range_test.go index 44f64782ca62..e7ecf47385f0 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -809,6 +809,7 @@ func pushTxnArgs(pusher, pushee *proto.Transaction, pushType proto.PushTxnType, Replica: proto.Replica{StoreID: storeID}, Txn: pusher, }, + Now: pusher.Timestamp, PusheeTxn: *pushee, PushType: pushType, } @@ -1766,9 +1767,9 @@ func TestInternalPushTxnHeartbeatTimeout(t *testing.T) { pushType proto.PushTxnType expSuccess bool }{ - {nil, 0, proto.PUSH_TIMESTAMP, false}, - {nil, 0, proto.ABORT_TXN, false}, - {nil, 0, proto.CLEANUP_TXN, false}, + {nil, 1, proto.PUSH_TIMESTAMP, false}, // using 0 is awkward + {nil, 1, proto.ABORT_TXN, false}, + {nil, 1, proto.CLEANUP_TXN, false}, {nil, ns, proto.PUSH_TIMESTAMP, false}, {nil, ns, proto.ABORT_TXN, false}, {nil, ns, proto.CLEANUP_TXN, false}, @@ -1807,7 +1808,8 @@ func TestInternalPushTxnHeartbeatTimeout(t *testing.T) { // Now, attempt to push the transaction with clock set to "currentTime". tc.manualClock.Set(test.currentTime) args, reply := pushTxnArgs(pusher, pushee, test.pushType, 1, tc.store.StoreID()) - args.Timestamp = tc.clock.Now() + args.Timestamp = proto.Timestamp{WallTime: test.currentTime} // avoid logical ticks + args.Now = args.Timestamp args.Timestamp.Logical = 0 err := tc.rng.AddCmd(tc.rng.context(), client.Call{Args: args, Reply: reply}, true)