Skip to content

Commit

Permalink
kv: teach rangefeed about isolation levels
Browse files Browse the repository at this point in the history
This commit adds in the necessary plumbing to rangefeed to track transactions
with their corresponding isolation level and then push transactions with that
isolation level. The plumbing starts in `MVCCLogicalOp.MVCCWriteIntentOp`,
through `resolvedTimestamp.unresolvedIntentQueue`, to `TxnMeta`, and finally to
`TxnPusher.PushTxns`.

This will be important if we have different txn push behavior depending on the
pushee's isolation level. Even if we don't, pushing with an accurate isolation
level avoids confusion.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 31, 2023
1 parent 9a025ba commit faad9dd
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 37 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
Expand Down Expand Up @@ -60,6 +61,7 @@ go_test(
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/storage",
Expand Down
32 changes: 18 additions & 14 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -60,22 +61,25 @@ func writeValueOp(ts hlc.Timestamp) enginepb.MVCCLogicalOp {
}

func writeIntentOpWithDetails(
txnID uuid.UUID, key []byte, minTS, ts hlc.Timestamp,
txnID uuid.UUID, key []byte, iso isolation.Level, minTS, ts hlc.Timestamp,
) enginepb.MVCCLogicalOp {
return makeLogicalOp(&enginepb.MVCCWriteIntentOp{
TxnID: txnID,
TxnKey: key,
TxnIsoLevel: iso,
TxnMinTimestamp: minTS,
Timestamp: ts,
})
}

func writeIntentOpWithKey(txnID uuid.UUID, key []byte, ts hlc.Timestamp) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txnID, key, ts /* minTS */, ts)
func writeIntentOpWithKey(
txnID uuid.UUID, key []byte, iso isolation.Level, ts hlc.Timestamp,
) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txnID, key, iso, ts /* minTS */, ts)
}

func writeIntentOp(txnID uuid.UUID, ts hlc.Timestamp) enginepb.MVCCLogicalOp {
return writeIntentOpWithKey(txnID, nil /* key */, ts)
return writeIntentOpWithKey(txnID, nil /* key */, 0, ts)
}

func updateIntentOp(txnID uuid.UUID, ts hlc.Timestamp) enginepb.MVCCLogicalOp {
Expand Down Expand Up @@ -791,25 +795,25 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Create a set of transactions.
txn1, txn2, txn3 := uuid.MakeV4(), uuid.MakeV4(), uuid.MakeV4()
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts10, MinTimestamp: ts10}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts20, MinTimestamp: ts20}
txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts30, MinTimestamp: ts30}
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts10, MinTimestamp: ts10}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts20, MinTimestamp: ts20}
txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts30, MinTimestamp: ts30}
txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.PENDING}
txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.PENDING}

// Modifications for test 2.
txn1MetaT2Pre := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts25, MinTimestamp: ts10}
txn1MetaT2Post := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts50, MinTimestamp: ts10}
txn2MetaT2Post := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts60, MinTimestamp: ts20}
txn3MetaT2Post := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts70, MinTimestamp: ts30}
txn1MetaT2Pre := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts25, MinTimestamp: ts10}
txn1MetaT2Post := enginepb.TxnMeta{ID: txn1, Key: keyA, IsoLevel: isolation.Serializable, WriteTimestamp: ts50, MinTimestamp: ts10}
txn2MetaT2Post := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts60, MinTimestamp: ts20}
txn3MetaT2Post := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts70, MinTimestamp: ts30}
txn1ProtoT2 := &roachpb.Transaction{TxnMeta: txn1MetaT2Post, Status: roachpb.COMMITTED}
txn2ProtoT2 := &roachpb.Transaction{TxnMeta: txn2MetaT2Post, Status: roachpb.PENDING}
txn3ProtoT2 := &roachpb.Transaction{TxnMeta: txn3MetaT2Post, Status: roachpb.PENDING}

// Modifications for test 3.
txn2MetaT3Post := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts60, MinTimestamp: ts20}
txn3MetaT3Post := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts90, MinTimestamp: ts30}
txn2MetaT3Post := enginepb.TxnMeta{ID: txn2, Key: keyB, IsoLevel: isolation.Snapshot, WriteTimestamp: ts60, MinTimestamp: ts20}
txn3MetaT3Post := enginepb.TxnMeta{ID: txn3, Key: keyC, IsoLevel: isolation.ReadCommitted, WriteTimestamp: ts90, MinTimestamp: ts30}
txn2ProtoT3 := &roachpb.Transaction{TxnMeta: txn2MetaT3Post, Status: roachpb.ABORTED}
txn3ProtoT3 := &roachpb.Transaction{TxnMeta: txn3MetaT3Post, Status: roachpb.PENDING}

Expand Down Expand Up @@ -885,7 +889,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Add a few intents and move the closed timestamp forward.
writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txn.ID, txn.Key, txn.MinTimestamp, txn.WriteTimestamp)
return writeIntentOpWithDetails(txn.ID, txn.Key, txn.IsoLevel, txn.MinTimestamp, txn.WriteTimestamp)
}
p.ConsumeLogicalOps(ctx,
writeIntentOpFromMeta(txn1Meta),
Expand Down
28 changes: 18 additions & 10 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"container/heap"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {

case *enginepb.MVCCWriteIntentOp:
rts.assertOpAboveRTS(op, t.Timestamp)
return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp)
return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnIsoLevel, t.TxnMinTimestamp, t.Timestamp)

case *enginepb.MVCCUpdateIntentOp:
return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp)
Expand Down Expand Up @@ -300,8 +301,9 @@ func (rts *resolvedTimestamp) assertOpAboveRTS(op enginepb.MVCCLogicalOp, opTS h
// the transaction on a given range.
type unresolvedTxn struct {
txnID uuid.UUID
txnKey roachpb.Key // unset if refCount < 0
txnMinTimestamp hlc.Timestamp // unset if refCount < 0
txnKey roachpb.Key // unset if refCount < 0
txnIsoLevel isolation.Level // unset if refCount < 0
txnMinTimestamp hlc.Timestamp // unset if refCount < 0
timestamp hlc.Timestamp
refCount int // count of unresolved intents

Expand All @@ -314,13 +316,14 @@ type unresolvedTxn struct {
func (t *unresolvedTxn) asTxnMeta() enginepb.TxnMeta {
if t.refCount <= 0 {
// An unresolvedTxn with a non-positive reference count may have an
// uninitialized txnKey and txnMinTimestamp. When in this state, we disallow
// the construction of a TxnMeta.
// uninitialized txnKey, txnIsoLevel, and txnMinTimestamp. When in this
// state, we disallow the construction of a TxnMeta.
panic("asTxnMeta called on unresolvedTxn with negative reference count")
}
return enginepb.TxnMeta{
ID: t.txnID,
Key: t.txnKey,
IsoLevel: t.txnIsoLevel,
MinTimestamp: t.txnMinTimestamp,
WriteTimestamp: t.timestamp,
}
Expand Down Expand Up @@ -428,27 +431,31 @@ func (uiq *unresolvedIntentQueue) Before(ts hlc.Timestamp) []*unresolvedTxn {
// returns whether the update advanced the timestamp of the oldest transaction
// in the queue.
func (uiq *unresolvedIntentQueue) IncRef(
txnID uuid.UUID, txnKey roachpb.Key, txnMinTS, ts hlc.Timestamp,
txnID uuid.UUID, txnKey roachpb.Key, txnIsoLevel isolation.Level, txnMinTS, ts hlc.Timestamp,
) bool {
return uiq.updateTxn(txnID, txnKey, txnMinTS, ts, +1)
return uiq.updateTxn(txnID, txnKey, txnIsoLevel, txnMinTS, ts, +1)
}

// DecrRef decrements the reference count of the specified transaction. It
// returns whether the update advanced the timestamp of the oldest transaction
// in the queue.
func (uiq *unresolvedIntentQueue) DecrRef(txnID uuid.UUID, ts hlc.Timestamp) bool {
return uiq.updateTxn(txnID, nil, hlc.Timestamp{}, ts, -1)
return uiq.updateTxn(txnID, nil, 0, hlc.Timestamp{}, ts, -1)
}

// UpdateTS updates the timestamp of the specified transaction without modifying
// its intent reference count. It returns whether the update advanced the
// timestamp of the oldest transaction in the queue.
func (uiq *unresolvedIntentQueue) UpdateTS(txnID uuid.UUID, ts hlc.Timestamp) bool {
return uiq.updateTxn(txnID, nil, hlc.Timestamp{}, ts, 0)
return uiq.updateTxn(txnID, nil, 0, hlc.Timestamp{}, ts, 0)
}

func (uiq *unresolvedIntentQueue) updateTxn(
txnID uuid.UUID, txnKey roachpb.Key, txnMinTS, ts hlc.Timestamp, delta int,
txnID uuid.UUID,
txnKey roachpb.Key,
txnIsoLevel isolation.Level,
txnMinTS, ts hlc.Timestamp,
delta int,
) bool {
txn, ok := uiq.txns[txnID]
if !ok {
Expand All @@ -461,6 +468,7 @@ func (uiq *unresolvedIntentQueue) updateTxn(
txn = &unresolvedTxn{
txnID: txnID,
txnKey: txnKey,
txnIsoLevel: txnIsoLevel,
txnMinTimestamp: txnMinTS,
timestamp: ts,
refCount: delta,
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package rangefeed
import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -34,13 +35,15 @@ func TestUnresolvedIntentQueue(t *testing.T) {
// Increment a non-existent txn.
txn1 := uuid.MakeV4()
txn1Key := roachpb.Key("key1")
txn1IsoLevel := isolation.Snapshot
txn1TS := hlc.Timestamp{WallTime: 1}
txn1MinTS := hlc.Timestamp{WallTime: 0, Logical: 4}
adv := uiq.IncRef(txn1, txn1Key, txn1MinTS, txn1TS)
adv := uiq.IncRef(txn1, txn1Key, txn1IsoLevel, txn1MinTS, txn1TS)
require.False(t, adv)
require.Equal(t, 1, uiq.Len())
require.Equal(t, txn1, uiq.Oldest().txnID)
require.Equal(t, txn1Key, uiq.Oldest().txnKey)
require.Equal(t, txn1IsoLevel, uiq.Oldest().txnIsoLevel)
require.Equal(t, txn1MinTS, uiq.Oldest().txnMinTimestamp)
require.Equal(t, txn1TS, uiq.Oldest().timestamp)
require.Equal(t, 1, uiq.Oldest().refCount)
Expand Down Expand Up @@ -115,7 +118,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {

// Increase txn1's ref count while increasing timestamp.
newTxn1TS = hlc.Timestamp{WallTime: 5}
adv = uiq.IncRef(txn1, txn1Key, txn1MinTS, newTxn1TS)
adv = uiq.IncRef(txn1, txn1Key, txn1IsoLevel, txn1MinTS, newTxn1TS)
require.False(t, adv)
require.Equal(t, 2, uiq.Len())
require.Equal(t, 2, uiq.txns[txn1].refCount)
Expand All @@ -132,7 +135,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {
// Add new txn at much higher timestamp. Immediately delete.
txn5 := uuid.MakeV4()
txn5TS := hlc.Timestamp{WallTime: 10}
adv = uiq.IncRef(txn5, nil, txn5TS, txn5TS)
adv = uiq.IncRef(txn5, nil, 0, txn5TS, txn5TS)
require.False(t, adv)
require.Equal(t, 3, uiq.Len())
require.Equal(t, txn2, uiq.Oldest().txnID)
Expand All @@ -141,7 +144,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {
require.Equal(t, 2, uiq.Len())

// Increase txn2's ref count, which results in deletion. txn1 new oldest.
adv = uiq.IncRef(txn2, nil, txn2TS, txn2TS)
adv = uiq.IncRef(txn2, nil, 0, txn2TS, txn2TS)
require.True(t, adv)
require.Equal(t, 1, uiq.Len())
require.Equal(t, txn1, uiq.Oldest().txnID)
Expand All @@ -156,7 +159,7 @@ func TestUnresolvedIntentQueue(t *testing.T) {
// Add new txn. Immediately decrement ref count. Should be empty again.
txn6 := uuid.MakeV4()
txn6TS := hlc.Timestamp{WallTime: 20}
adv = uiq.IncRef(txn6, nil, txn6TS, txn6TS)
adv = uiq.IncRef(txn6, nil, 0, txn6TS, txn6TS)
require.False(t, adv)
require.Equal(t, 1, uiq.Len())
require.Equal(t, txn6, uiq.Oldest().txnID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (s *SeparatedIntentScanner) ConsumeIntents(
consumer(enginepb.MVCCWriteIntentOp{
TxnID: meta.Txn.ID,
TxnKey: meta.Txn.Key,
TxnIsoLevel: meta.Txn.IsoLevel,
TxnMinTimestamp: meta.Txn.MinTimestamp,
Timestamp: meta.Txn.WriteTimestamp,
})
Expand Down Expand Up @@ -208,6 +209,7 @@ func (l *LegacyIntentScanner) ConsumeIntents(
consumer(enginepb.MVCCWriteIntentOp{
TxnID: meta.Txn.ID,
TxnKey: meta.Txn.Key,
TxnIsoLevel: meta.Txn.IsoLevel,
TxnMinTimestamp: meta.Txn.MinTimestamp,
Timestamp: meta.Txn.WriteTimestamp,
})
Expand Down
14 changes: 8 additions & 6 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -229,10 +230,11 @@ func TestInitResolvedTSScan(t *testing.T) {
startKey := roachpb.RKey("d")
endKey := roachpb.RKey("w")

makeTxn := func(key string, id uuid.UUID, ts hlc.Timestamp) roachpb.Transaction {
makeTxn := func(key string, id uuid.UUID, iso isolation.Level, ts hlc.Timestamp) roachpb.Transaction {
txnMeta := enginepb.TxnMeta{
Key: []byte(key),
ID: id,
IsoLevel: iso,
Epoch: 1,
WriteTimestamp: ts,
MinTimestamp: ts,
Expand All @@ -246,12 +248,12 @@ func TestInitResolvedTSScan(t *testing.T) {
txn1ID := uuid.MakeV4()
txn1TS := hlc.Timestamp{WallTime: 15}
txn1Key := "txnKey1"
txn1 := makeTxn(txn1Key, txn1ID, txn1TS)
txn1 := makeTxn(txn1Key, txn1ID, isolation.Serializable, txn1TS)

txn2ID := uuid.MakeV4()
txn2TS := hlc.Timestamp{WallTime: 21}
txn2Key := "txnKey2"
txn2 := makeTxn(txn2Key, txn2ID, txn2TS)
txn2 := makeTxn(txn2Key, txn2ID, isolation.ReadCommitted, txn2TS)

type op struct {
kv storage.MVCCKeyValue
Expand Down Expand Up @@ -305,13 +307,13 @@ func TestInitResolvedTSScan(t *testing.T) {

expEvents := []*event{
{ops: []enginepb.MVCCLogicalOp{
writeIntentOpWithKey(txn2ID, []byte("txnKey2"), hlc.Timestamp{WallTime: 21}),
writeIntentOpWithKey(txn2ID, []byte("txnKey2"), isolation.ReadCommitted, hlc.Timestamp{WallTime: 21}),
}},
{ops: []enginepb.MVCCLogicalOp{
writeIntentOpWithKey(txn1ID, []byte("txnKey1"), hlc.Timestamp{WallTime: 15}),
writeIntentOpWithKey(txn1ID, []byte("txnKey1"), isolation.Serializable, hlc.Timestamp{WallTime: 15}),
}},
{ops: []enginepb.MVCCLogicalOp{
writeIntentOpWithKey(txn1ID, []byte("txnKey1"), hlc.Timestamp{WallTime: 15}),
writeIntentOpWithKey(txn1ID, []byte("txnKey1"), isolation.Serializable, hlc.Timestamp{WallTime: 15}),
}},
{initRTS: true},
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ go_test(
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/diskmap",
"//pkg/kv/kvserver/spanset",
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/enginepb/mvcc3.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ message MVCCWriteIntentOp {
(gogoproto.customname) = "TxnID",
(gogoproto.nullable) = false];
bytes txn_key = 2;
cockroach.kv.kvserver.concurrency.isolation.Level txn_iso_level = 5;
util.hlc.Timestamp txn_min_timestamp = 4 [(gogoproto.nullable) = false];
util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false];
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/mvcc_logical_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (ol *OpLoggerBatch) logLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO
ol.recordOp(&enginepb.MVCCWriteIntentOp{
TxnID: details.Txn.ID,
TxnKey: details.Txn.Key,
TxnIsoLevel: details.Txn.IsoLevel,
TxnMinTimestamp: details.Txn.MinTimestamp,
Timestamp: details.Timestamp,
})
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/mvcc_logical_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -88,8 +89,9 @@ func TestMVCCOpLogWriter(t *testing.T) {
}

// Write another intent, push it, then abort it.
txn2ts := makeTxn(*txn2, hlc.Timestamp{Logical: 5})
if err := MVCCPut(ctx, ol, nil, testKey3, txn2ts.ReadTimestamp, hlc.ClockTimestamp{}, value4, txn2ts); err != nil {
txn2 := makeTxn(*txn2, hlc.Timestamp{Logical: 5})
txn2.IsoLevel = isolation.ReadCommitted
if err := MVCCPut(ctx, ol, nil, testKey3, txn2.ReadTimestamp, hlc.ClockTimestamp{}, value4, txn2); err != nil {
t.Fatal(err)
}
txn2Pushed := *txn2
Expand Down Expand Up @@ -128,6 +130,7 @@ func TestMVCCOpLogWriter(t *testing.T) {
makeOp(&enginepb.MVCCWriteIntentOp{
TxnID: txn1.ID,
TxnKey: txn1.Key,
TxnIsoLevel: txn1.IsoLevel,
TxnMinTimestamp: txn1.MinTimestamp,
Timestamp: hlc.Timestamp{Logical: 2},
}),
Expand All @@ -138,6 +141,7 @@ func TestMVCCOpLogWriter(t *testing.T) {
makeOp(&enginepb.MVCCWriteIntentOp{
TxnID: txn1.ID,
TxnKey: txn1.Key,
TxnIsoLevel: txn1.IsoLevel,
TxnMinTimestamp: txn1.MinTimestamp,
Timestamp: hlc.Timestamp{Logical: 4},
}),
Expand All @@ -154,6 +158,7 @@ func TestMVCCOpLogWriter(t *testing.T) {
makeOp(&enginepb.MVCCWriteIntentOp{
TxnID: txn2.ID,
TxnKey: txn2.Key,
TxnIsoLevel: txn2.IsoLevel,
TxnMinTimestamp: txn2.MinTimestamp,
Timestamp: hlc.Timestamp{Logical: 5},
}),
Expand Down

0 comments on commit faad9dd

Please sign in to comment.