From 20e84f37df5c4f1c8c6c51373b7b5b8b0fa83e6b Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 13:55:05 +0400 Subject: [PATCH 1/3] run 'go fmt' using go 1.8 --- txn/sim_test.go | 2 +- txn/tarjan_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/txn/sim_test.go b/txn/sim_test.go index a369ded7c..37b2799be 100644 --- a/txn/sim_test.go +++ b/txn/sim_test.go @@ -2,11 +2,11 @@ package txn_test import ( "flag" + . "gopkg.in/check.v1" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/dbtest" "gopkg.in/mgo.v2/txn" - . "gopkg.in/check.v1" "math/rand" "time" ) diff --git a/txn/tarjan_test.go b/txn/tarjan_test.go index 79745c39b..e655c2b93 100644 --- a/txn/tarjan_test.go +++ b/txn/tarjan_test.go @@ -2,8 +2,8 @@ package txn import ( "fmt" - "gopkg.in/mgo.v2/bson" . "gopkg.in/check.v1" + "gopkg.in/mgo.v2/bson" ) type TarjanSuite struct{} From 2eb5d1c1695c46dfbd0f888ca65f2f3237858339 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 14:11:47 +0400 Subject: [PATCH 2/3] Add the test cases that show O(N^2) performance --- txn/txn_test.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/txn/txn_test.go b/txn/txn_test.go index 12923ca12..9421c3e3e 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { } var flaky = flag.Bool("flaky", false, "Include flaky tests") +var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests") + func (s *S) TestTxnQueueStressTest(c *C) { // This fails about 20% of the time on Mongo 3.2 (I haven't tried @@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) { } } } + +type txnQueue struct { + Queue []string `bson:"txn-queue"` +} + +func (s *S) TestTxnQueueAssertionGrowth(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0}) + c.Assert(err, IsNil) + // Create many assertion only transactions. + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Assert: M{"balance": 0}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, IsNil) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + txn.SetChaos(txn.Chaos{}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.Run(ops, "", nil) + c.Logf("%8.3fs to clear N=%d assertions and add one more txn", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueueBrokenPrepared(c *C) { + txn.SetDebug(false) // too much spam + badTxnToken := "123456789012345678901234_deadbeef" + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}}) + c.Assert(err, IsNil) + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + errString := `cannot find transaction ObjectIdHex("123456789012345678901234")` + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err.Error(), Equals, errString) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1) + c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.ResumeAll() + c.Assert(err, IsNil) + c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueuePreparing(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}}) + c.Assert(err, IsNil) + t := time.Now() + txn.SetChaos(txn.Chaos{ + KillChance: 1.0, + Breakpoint: "set-prepared", + }) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength) + txn.SetChaos(txn.Chaos{}) + t = time.Now() + err = s.runner.ResumeAll() + c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + expectedCount := 100 + if *txnQueueLength <= expectedCount { + expectedCount = *txnQueueLength - 1 + } + c.Check(len(qdoc.Queue), Equals, expectedCount) +} + From 94280954e0207c30d360392c517b38d6a024c7a7 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 14:12:11 +0400 Subject: [PATCH 3/3] Cache conversion from token to TXN ObjectId. When walking graphs (hasPreReq), we can actually spend a lot of time doing the conversion from a 'hex+nonce' token string back to a binary ObjectId. Cache them in the flusher. --- cluster_test.go | 1 - txn/flusher.go | 47 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 54ec86762..660830c8a 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1477,7 +1477,6 @@ func (s *S) TestSecondaryModeWithMongosInsert(c *C) { c.Assert(result.A, Equals, 1) } - func (s *S) TestRemovalOfClusterMember(c *C) { if *fast { c.Skip("-fast") diff --git a/txn/flusher.go b/txn/flusher.go index f640a4380..64b06c3ec 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -12,7 +12,7 @@ func flush(r *Runner, t *transaction) error { Runner: r, goal: t, goalKeys: make(map[docKey]bool), - queue: make(map[docKey][]token), + queue: make(map[docKey][]tokenAndId), debugId: debugPrefix(), } for _, dkey := range f.goal.docKeys() { @@ -25,10 +25,36 @@ type flusher struct { *Runner goal *transaction goalKeys map[docKey]bool - queue map[docKey][]token + queue map[docKey][]tokenAndId debugId string } +type tokenAndId struct { + tt token + bid bson.ObjectId +} + +func (ti tokenAndId) id() bson.ObjectId { + return ti.bid +} + +func (ti tokenAndId) nonce() string { + return ti.tt.nonce() +} + +func (ti tokenAndId) String() string { + return string(ti.tt) +} + +func tokensWithIds(q []token) []tokenAndId { + out := make([]tokenAndId, len(q)) + for i, tt := range q { + out[i].tt = tt + out[i].bid = tt.id() + } + return out +} + func (f *flusher) run() (err error) { if chaosEnabled { defer f.handleChaos(&err) @@ -247,7 +273,7 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc } else { @@ -309,7 +335,7 @@ NextDoc: f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -451,7 +477,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: @@ -515,12 +541,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 { func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) { found = true + ttId := tt.id() NextDoc: for _, dkey := range dkeys { for _, dtt := range f.queue[dkey] { - if dtt == tt { + if dtt.tt == tt { continue NextDoc - } else if dtt.id() != tt.id() { + } else if dtt.id() != ttId { prereqs = true } } @@ -908,17 +935,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err return nil } -func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token { +func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token { var result []token for j := len(dqueue) - 1; j >= 0; j-- { dtt := dqueue[j] - if dtt == dontPull { + if dtt.tt == dontPull { continue } if _, ok := pull[dtt.id()]; ok { // It was handled before and this is a leftover invalid // nonce in the queue. Cherry-pick it out. - result = append(result, dtt) + result = append(result, dtt.tt) } } return result