From e18f1ed32a431c25c514ef9d72c4ed238b29c20d Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 19 Oct 2018 19:57:40 -0700 Subject: [PATCH 1/6] Fix to ensure that Zero sends out all commit timestamps in order. --- conn/node.go | 4 +- contrib/integration/testhas/main.go | 242 ++++++++++++++++++++++++++++ dgraph/cmd/zero/oracle.go | 10 ++ dgraph/docker-compose.yml | 112 ++++++------- worker/draft.go | 6 + worker/groups.go | 2 +- 6 files changed, 317 insertions(+), 59 deletions(-) create mode 100644 contrib/integration/testhas/main.go diff --git a/conn/node.go b/conn/node.go index b7d02ca317e..53b0a8fccb3 100644 --- a/conn/node.go +++ b/conn/node.go @@ -75,8 +75,8 @@ type Node struct { type raftLogger struct { } -func (rl *raftLogger) Debug(v ...interface{}) { glog.V(1).Info(v...) } -func (rl *raftLogger) Debugf(format string, v ...interface{}) { glog.V(1).Infof(format, v...) } +func (rl *raftLogger) Debug(v ...interface{}) { glog.V(3).Info(v...) } +func (rl *raftLogger) Debugf(format string, v ...interface{}) { glog.V(3).Infof(format, v...) } func (rl *raftLogger) Error(v ...interface{}) { glog.Error(v...) } func (rl *raftLogger) Errorf(format string, v ...interface{}) { glog.Errorf(format, v...) } func (rl *raftLogger) Info(v ...interface{}) { glog.Info(v...) } diff --git a/contrib/integration/testhas/main.go b/contrib/integration/testhas/main.go new file mode 100644 index 00000000000..6bb415be523 --- /dev/null +++ b/contrib/integration/testhas/main.go @@ -0,0 +1,242 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/dgraph-io/dgo" + "github.com/dgraph-io/dgo/protos/api" + "github.com/dgraph-io/dgo/x" + "github.com/dgraph-io/dgo/y" + "google.golang.org/grpc" +) + +var ( + addr = flag.String("addr", "localhost:9080", "dgraph address") + concurr = flag.Int("c", 3, "number of concurrent upserts per account") +) + +var ( + firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"} + lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"} + types = []string{"CEO", "COO", "CTO", "CFO"} +) + +type account struct { + first string + last string + age int +} + +var accounts []account + +func init() { + for _, first := range firsts { + for _, last := range lasts { + for i := 0; i < 1000; i++ { + accounts = append(accounts, account{ + first: first, + last: last, + age: i, + }) + } + } + } +} + +func main() { + flag.Parse() + c := newClient() + setup(c) + fmt.Println("Doing upserts") + doUpserts(c) + fmt.Println("Checking integrity") + checkIntegrity(c) +} + +func newClient() *dgo.Dgraph { + d, err := grpc.Dial(*addr, grpc.WithInsecure()) + x.Check(err) + return dgo.NewDgraphClient( + api.NewDgraphClient(d), + ) +} + +func setup(c *dgo.Dgraph) { + ctx := context.Background() + x.Check(c.Alter(ctx, &api.Operation{ + DropAll: true, + })) + x.Check(c.Alter(ctx, &api.Operation{ + Schema: ` + first: string @index(term) . + last: string @index(hash) . + age: int @index(int) . + when: int . + `, + })) +} + +func doUpserts(c *dgo.Dgraph) { + var wg sync.WaitGroup + inputCh := make(chan account, 1000) + go func() { + for _, acct := range accounts { + inputCh <- acct + } + close(inputCh) + }() + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + for acct := range inputCh { + upsert(c, acct) + } + wg.Done() + }() + } + wg.Wait() +} + +var ( + successCount uint64 + retryCount uint64 + totalCount uint64 +) + +func upsert(c *dgo.Dgraph, acc account) { + for { + if atomic.AddUint64(&totalCount, 1)%100 == 0 { + fmt.Printf("[%s] Success: %d Retries: %d Account: %v\n", time.Now().Format(time.Stamp), + atomic.LoadUint64(&successCount), atomic.LoadUint64(&retryCount), acc) + } + err := tryUpsert(c, acc) + if err == nil { + atomic.AddUint64(&successCount, 1) + return + } else if err == y.ErrAborted { + // pass + } else { + fmt.Printf("Error: %v\n", err) + } + atomic.AddUint64(&retryCount, 1) + } +} + +func tryUpsert(c *dgo.Dgraph, acc account) error { + ctx := context.Background() + + txn := c.NewTxn() + defer txn.Discard(ctx) + q := fmt.Sprintf(` + { + get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) { + uid expand(_all_) {uid} + } + } + `, acc.first, acc.last, acc.age) + +retry: + resp, err := txn.Query(ctx, q) + if err != nil { + log.Printf("Got error while querying: %v. Retrying...\n", err) + goto retry + } + + decode := struct { + Get []struct { + Uid *string + } + }{} + x.Check(json.Unmarshal(resp.GetJson(), &decode)) + + x.AssertTrue(len(decode.Get) <= 1) + s := rand.NewSource(time.Now().Unix()) + r := rand.New(s) // initialize local pseudorandom generator + t := r.Intn(len(types)) + + var uid string + if len(decode.Get) == 1 { + x.AssertTrue(decode.Get[0].Uid != nil) + uid = *decode.Get[0].Uid + } else { + nqs := fmt.Sprintf(` + _:acct %q . + _:acct %q . + _:acct "%d"^^ . + _:acct %q . + _:acct <%s> "" . + _:acct "" . + `, + acc.first, acc.last, acc.age, types[t], types[t], + ) + mu := &api.Mutation{SetNquads: []byte(nqs)} + assigned, err := txn.Mutate(ctx, mu) + if err != nil { + return err + } + uid = assigned.GetUids()["acct"] + x.AssertTrue(uid != "") + } + + nq := fmt.Sprintf(` + <%s> "%d"^^ . + `, + uid, time.Now().Nanosecond(), + ) + mu := &api.Mutation{SetNquads: []byte(nq)} + if _, err = txn.Mutate(ctx, mu); err != nil { + return err + } + + return txn.Commit(ctx) +} + +func checkIntegrity(c *dgo.Dgraph) { + ctx := context.Background() + + q := fmt.Sprintf(` + { + all(func: anyofterms(first, %q)) { + first + last + age + } + } + `, strings.Join(firsts, " ")) + resp, err := c.NewTxn().Query(ctx, q) + x.Check(err) + + decode := struct { + All []struct { + First *string + Last *string + Age *int + } + }{} + x.Check(json.Unmarshal(resp.GetJson(), &decode)) + + // Make sure there is exactly one of each account. + accountSet := make(map[string]struct{}) + for _, record := range decode.All { + x.AssertTrue(record.First != nil) + x.AssertTrue(record.Last != nil) + x.AssertTrue(record.Age != nil) + str := fmt.Sprintf("%s_%s_%d", *record.First, *record.Last, *record.Age) + accountSet[str] = struct{}{} + } + x.AssertTrue(len(accountSet) == len(accounts)) + for _, acct := range accounts { + str := fmt.Sprintf("%s_%s_%d", acct.first, acct.last, acct.age) + _, ok := accountSet[str] + x.AssertTrue(ok) + } +} diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 7a382b98874..bd004575458 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -176,6 +176,16 @@ func (o *Oracle) sendDeltasToSubscribers() { } } sortTxns(delta) // Sort them in increasing order of CommitTs. + // Let's ensure that we have all the commits up until the max here. + // Otherwise, we'll be sending commit timestamps out of order, which + // would cause Alphas to ignore them, during writes to Badger. + if len(delta.Txns) > 0 { + maxTs := delta.Txns[len(delta.Txns)-1].CommitTs + if o.doneUntil.DoneUntil() < maxTs { + // Don't send it yet. Keep on picking up more. + continue // the outer for loop. + } + } o.Lock() for id, ch := range o.subscribers { select { diff --git a/dgraph/docker-compose.yml b/dgraph/docker-compose.yml index eb95134842e..0f7a0bcdd05 100644 --- a/dgraph/docker-compose.yml +++ b/dgraph/docker-compose.yml @@ -21,31 +21,31 @@ services: # source: "${DATA}" # target: /data - zero2: - image: dgraph/dgraph:latest - container_name: bank-dg0.2 - command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --peer=zero1:5080 --idx 2 --logtostderr -v=2 - ports: - - 5082:5082 - - 6082:6082 - volumes: - - type: bind - source: $GOPATH/bin - target: /gobin - read_only: true + # zero2: + # image: dgraph/dgraph:latest + # container_name: bank-dg0.2 + # command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --peer=zero1:5080 --idx 2 --logtostderr -v=2 + # ports: + # - 5082:5082 + # - 6082:6082 + # volumes: + # - type: bind + # source: $GOPATH/bin + # target: /gobin + # read_only: true - zero3: - image: dgraph/dgraph:latest - container_name: bank-dg0.3 - command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --peer=zero1:5080 --idx 3 --logtostderr -v=2 - ports: - - 5083:5083 - - 6083:6083 - volumes: - - type: bind - source: $GOPATH/bin - target: /gobin - read_only: true + # zero3: + # image: dgraph/dgraph:latest + # container_name: bank-dg0.3 + # command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --peer=zero1:5080 --idx 3 --logtostderr -v=2 + # ports: + # - 5083:5083 + # - 6083:6083 + # volumes: + # - type: bind + # source: $GOPATH/bin + # target: /gobin + # read_only: true # zero4: # image: debian:latest @@ -86,39 +86,39 @@ services: - 9180:9180 command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 - dg2: - image: dgraph/dgraph:latest - container_name: bank-dg2 - working_dir: /data/dg2 - volumes: - - type: bind - source: $GOPATH/bin - target: /gobin - read_only: true - # - type: bind - # source: "${DATA}" - # target: /data - ports: - - 8182:8182 - - 9182:9182 - command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 + # dg2: + # image: dgraph/dgraph:latest + # container_name: bank-dg2 + # working_dir: /data/dg2 + # volumes: + # - type: bind + # source: $GOPATH/bin + # target: /gobin + # read_only: true + # # - type: bind + # # source: "${DATA}" + # # target: /data + # ports: + # - 8182:8182 + # - 9182:9182 + # command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 - dg3: - image: dgraph/dgraph:latest - container_name: bank-dg3 - working_dir: /data/dg3 - volumes: - - type: bind - source: $GOPATH/bin - target: /gobin - read_only: true - # - type: bind - # source: "${DATA}" - # target: /data - ports: - - 8183:8183 - - 9183:9183 - command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 + # dg3: + # image: dgraph/dgraph:latest + # container_name: bank-dg3 + # working_dir: /data/dg3 + # volumes: + # - type: bind + # source: $GOPATH/bin + # target: /gobin + # read_only: true + # # - type: bind + # # source: "${DATA}" + # # target: /data + # ports: + # - 8183:8183 + # - 9183:9183 + # command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 # dg4: # image: debian:latest diff --git a/worker/draft.go b/worker/draft.go index 34c60187ca1..ed4ace3d458 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -59,6 +59,8 @@ type node struct { gid uint32 closer *y.Closer + lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing. + streaming int32 // Used to avoid calculating snapshot canCampaign bool @@ -450,7 +452,11 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { } } for _, status := range delta.Txns { + if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs { + glog.Fatalf("lastcommit %d > commit now %d", n.lastCommitTs, status.CommitTs) + } toDisk(status.StartTs, status.CommitTs) + n.lastCommitTs = status.CommitTs } if err := writer.Flush(); err != nil { x.Errorf("Error while flushing to disk: %v", err) diff --git a/worker/groups.go b/worker/groups.go index 60f1e8a419e..9d150bb8a15 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -801,7 +801,7 @@ func (g *groupi) processOracleDeltaStream() { elog.Errorf("No longer the leader of group %d. Exiting", g.groupId()) return } - elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) + glog.V(2).Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) for { // Block forever trying to propose this. err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) From 9d8717900c4b496755a9aa088d435bcd017525c8 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 22 Oct 2018 14:10:49 -0700 Subject: [PATCH 2/6] Revert some other changes --- contrib/integration/testhas/main.go | 242 ---------------------------- dgraph/docker-compose.yml | 112 ++++++------- 2 files changed, 56 insertions(+), 298 deletions(-) delete mode 100644 contrib/integration/testhas/main.go diff --git a/contrib/integration/testhas/main.go b/contrib/integration/testhas/main.go deleted file mode 100644 index 6bb415be523..00000000000 --- a/contrib/integration/testhas/main.go +++ /dev/null @@ -1,242 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "log" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/dgraph-io/dgo" - "github.com/dgraph-io/dgo/protos/api" - "github.com/dgraph-io/dgo/x" - "github.com/dgraph-io/dgo/y" - "google.golang.org/grpc" -) - -var ( - addr = flag.String("addr", "localhost:9080", "dgraph address") - concurr = flag.Int("c", 3, "number of concurrent upserts per account") -) - -var ( - firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"} - lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"} - types = []string{"CEO", "COO", "CTO", "CFO"} -) - -type account struct { - first string - last string - age int -} - -var accounts []account - -func init() { - for _, first := range firsts { - for _, last := range lasts { - for i := 0; i < 1000; i++ { - accounts = append(accounts, account{ - first: first, - last: last, - age: i, - }) - } - } - } -} - -func main() { - flag.Parse() - c := newClient() - setup(c) - fmt.Println("Doing upserts") - doUpserts(c) - fmt.Println("Checking integrity") - checkIntegrity(c) -} - -func newClient() *dgo.Dgraph { - d, err := grpc.Dial(*addr, grpc.WithInsecure()) - x.Check(err) - return dgo.NewDgraphClient( - api.NewDgraphClient(d), - ) -} - -func setup(c *dgo.Dgraph) { - ctx := context.Background() - x.Check(c.Alter(ctx, &api.Operation{ - DropAll: true, - })) - x.Check(c.Alter(ctx, &api.Operation{ - Schema: ` - first: string @index(term) . - last: string @index(hash) . - age: int @index(int) . - when: int . - `, - })) -} - -func doUpserts(c *dgo.Dgraph) { - var wg sync.WaitGroup - inputCh := make(chan account, 1000) - go func() { - for _, acct := range accounts { - inputCh <- acct - } - close(inputCh) - }() - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - for acct := range inputCh { - upsert(c, acct) - } - wg.Done() - }() - } - wg.Wait() -} - -var ( - successCount uint64 - retryCount uint64 - totalCount uint64 -) - -func upsert(c *dgo.Dgraph, acc account) { - for { - if atomic.AddUint64(&totalCount, 1)%100 == 0 { - fmt.Printf("[%s] Success: %d Retries: %d Account: %v\n", time.Now().Format(time.Stamp), - atomic.LoadUint64(&successCount), atomic.LoadUint64(&retryCount), acc) - } - err := tryUpsert(c, acc) - if err == nil { - atomic.AddUint64(&successCount, 1) - return - } else if err == y.ErrAborted { - // pass - } else { - fmt.Printf("Error: %v\n", err) - } - atomic.AddUint64(&retryCount, 1) - } -} - -func tryUpsert(c *dgo.Dgraph, acc account) error { - ctx := context.Background() - - txn := c.NewTxn() - defer txn.Discard(ctx) - q := fmt.Sprintf(` - { - get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) { - uid expand(_all_) {uid} - } - } - `, acc.first, acc.last, acc.age) - -retry: - resp, err := txn.Query(ctx, q) - if err != nil { - log.Printf("Got error while querying: %v. Retrying...\n", err) - goto retry - } - - decode := struct { - Get []struct { - Uid *string - } - }{} - x.Check(json.Unmarshal(resp.GetJson(), &decode)) - - x.AssertTrue(len(decode.Get) <= 1) - s := rand.NewSource(time.Now().Unix()) - r := rand.New(s) // initialize local pseudorandom generator - t := r.Intn(len(types)) - - var uid string - if len(decode.Get) == 1 { - x.AssertTrue(decode.Get[0].Uid != nil) - uid = *decode.Get[0].Uid - } else { - nqs := fmt.Sprintf(` - _:acct %q . - _:acct %q . - _:acct "%d"^^ . - _:acct %q . - _:acct <%s> "" . - _:acct "" . - `, - acc.first, acc.last, acc.age, types[t], types[t], - ) - mu := &api.Mutation{SetNquads: []byte(nqs)} - assigned, err := txn.Mutate(ctx, mu) - if err != nil { - return err - } - uid = assigned.GetUids()["acct"] - x.AssertTrue(uid != "") - } - - nq := fmt.Sprintf(` - <%s> "%d"^^ . - `, - uid, time.Now().Nanosecond(), - ) - mu := &api.Mutation{SetNquads: []byte(nq)} - if _, err = txn.Mutate(ctx, mu); err != nil { - return err - } - - return txn.Commit(ctx) -} - -func checkIntegrity(c *dgo.Dgraph) { - ctx := context.Background() - - q := fmt.Sprintf(` - { - all(func: anyofterms(first, %q)) { - first - last - age - } - } - `, strings.Join(firsts, " ")) - resp, err := c.NewTxn().Query(ctx, q) - x.Check(err) - - decode := struct { - All []struct { - First *string - Last *string - Age *int - } - }{} - x.Check(json.Unmarshal(resp.GetJson(), &decode)) - - // Make sure there is exactly one of each account. - accountSet := make(map[string]struct{}) - for _, record := range decode.All { - x.AssertTrue(record.First != nil) - x.AssertTrue(record.Last != nil) - x.AssertTrue(record.Age != nil) - str := fmt.Sprintf("%s_%s_%d", *record.First, *record.Last, *record.Age) - accountSet[str] = struct{}{} - } - x.AssertTrue(len(accountSet) == len(accounts)) - for _, acct := range accounts { - str := fmt.Sprintf("%s_%s_%d", acct.first, acct.last, acct.age) - _, ok := accountSet[str] - x.AssertTrue(ok) - } -} diff --git a/dgraph/docker-compose.yml b/dgraph/docker-compose.yml index 0f7a0bcdd05..eb95134842e 100644 --- a/dgraph/docker-compose.yml +++ b/dgraph/docker-compose.yml @@ -21,31 +21,31 @@ services: # source: "${DATA}" # target: /data - # zero2: - # image: dgraph/dgraph:latest - # container_name: bank-dg0.2 - # command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --peer=zero1:5080 --idx 2 --logtostderr -v=2 - # ports: - # - 5082:5082 - # - 6082:6082 - # volumes: - # - type: bind - # source: $GOPATH/bin - # target: /gobin - # read_only: true + zero2: + image: dgraph/dgraph:latest + container_name: bank-dg0.2 + command: /gobin/dgraph zero -o 2 --my=zero2:5082 --replicas 3 --peer=zero1:5080 --idx 2 --logtostderr -v=2 + ports: + - 5082:5082 + - 6082:6082 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true - # zero3: - # image: dgraph/dgraph:latest - # container_name: bank-dg0.3 - # command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --peer=zero1:5080 --idx 3 --logtostderr -v=2 - # ports: - # - 5083:5083 - # - 6083:6083 - # volumes: - # - type: bind - # source: $GOPATH/bin - # target: /gobin - # read_only: true + zero3: + image: dgraph/dgraph:latest + container_name: bank-dg0.3 + command: /gobin/dgraph zero -o 3 --my=zero3:5083 --replicas 3 --peer=zero1:5080 --idx 3 --logtostderr -v=2 + ports: + - 5083:5083 + - 6083:6083 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true # zero4: # image: debian:latest @@ -86,39 +86,39 @@ services: - 9180:9180 command: /gobin/dgraph alpha --my=dg1:7180 --lru_mb=1024 --zero=zero1:5080 -o 100 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 - # dg2: - # image: dgraph/dgraph:latest - # container_name: bank-dg2 - # working_dir: /data/dg2 - # volumes: - # - type: bind - # source: $GOPATH/bin - # target: /gobin - # read_only: true - # # - type: bind - # # source: "${DATA}" - # # target: /data - # ports: - # - 8182:8182 - # - 9182:9182 - # command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 + dg2: + image: dgraph/dgraph:latest + container_name: bank-dg2 + working_dir: /data/dg2 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + # - type: bind + # source: "${DATA}" + # target: /data + ports: + - 8182:8182 + - 9182:9182 + command: /gobin/dgraph alpha --my=dg2:7182 --lru_mb=1024 --zero=zero1:5080 -o 102 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 - # dg3: - # image: dgraph/dgraph:latest - # container_name: bank-dg3 - # working_dir: /data/dg3 - # volumes: - # - type: bind - # source: $GOPATH/bin - # target: /gobin - # read_only: true - # # - type: bind - # # source: "${DATA}" - # # target: /data - # ports: - # - 8183:8183 - # - 9183:9183 - # command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 + dg3: + image: dgraph/dgraph:latest + container_name: bank-dg3 + working_dir: /data/dg3 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + # - type: bind + # source: "${DATA}" + # target: /data + ports: + - 8183:8183 + - 9183:9183 + command: /gobin/dgraph alpha --my=dg3:7183 --lru_mb=1024 --zero=zero1:5080 -o 103 --expose_trace --trace 1.0 --profile_mode block --block_rate 10 --logtostderr -v=2 # dg4: # image: debian:latest From 309ec8db79b6fe9d91976d309f0ea09503584e68 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 22 Oct 2018 14:20:57 -0700 Subject: [PATCH 3/6] Self review --- worker/draft.go | 4 +++- worker/groups.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/worker/draft.go b/worker/draft.go index ed4ace3d458..9a3afe37be6 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -451,9 +451,11 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { time.Sleep(10 * time.Millisecond) } } + for _, status := range delta.Txns { if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs { - glog.Fatalf("lastcommit %d > commit now %d", n.lastCommitTs, status.CommitTs) + glog.Errorf("lastcommit %d > current %d. This would cause some commits to be lost.", + n.lastCommitTs, status.CommitTs) } toDisk(status.StartTs, status.CommitTs) n.lastCommitTs = status.CommitTs diff --git a/worker/groups.go b/worker/groups.go index 9d150bb8a15..60f1e8a419e 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -801,7 +801,7 @@ func (g *groupi) processOracleDeltaStream() { elog.Errorf("No longer the leader of group %d. Exiting", g.groupId()) return } - glog.V(2).Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) + elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) for { // Block forever trying to propose this. err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) From 6850ea5aaa815a287a457629523d064e4e70b295 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 22 Oct 2018 15:30:43 -0700 Subject: [PATCH 4/6] Always output INFO logs as well. --- dgraph/cmd/root.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 234d6a45678..0d3be153bf3 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -75,7 +75,12 @@ func init() { RootCmd.PersistentFlags().Bool("expose_trace", false, "Allow trace endpoint to be accessible from remote") rootConf.BindPFlags(RootCmd.PersistentFlags()) + flag.CommandLine.AddGoFlagSet(goflag.CommandLine) + // Always set stderrthreshold=0. Don't let users set it themselves. + x.Check(flag.Set("stderrthreshold", "0")) + flag.CommandLine.MarkDeprecated("stderrthreshold", + "Dgraph always sets this flag to 0. It can't be overwritten.") var subcommands = []*x.SubCommand{ &bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, From 0d2ebac27b1e39ad37f44e709a361bd2c13b7c32 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 22 Oct 2018 16:16:41 -0700 Subject: [PATCH 5/6] Do the sorting of txn status in Alpha, because Alpha also batches multiple updates from Zero. --- dgraph/cmd/zero/oracle.go | 28 +++++++++++++--------------- worker/groups.go | 9 ++++++++- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index bd004575458..bfd65e16594 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -19,7 +19,6 @@ package zero import ( "errors" "math/rand" - "sort" "time" "github.com/dgraph-io/dgo/protos/api" @@ -116,12 +115,6 @@ func (o *Oracle) commit(src *api.TxnContext) error { return nil } -func sortTxns(delta *pb.OracleDelta) { - sort.Slice(delta.Txns, func(i, j int) bool { - return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs - }) -} - func (o *Oracle) currentState() *pb.OracleDelta { o.AssertRLock() resp := &pb.OracleDelta{} @@ -175,17 +168,22 @@ func (o *Oracle) sendDeltasToSubscribers() { break slurp_loop } } - sortTxns(delta) // Sort them in increasing order of CommitTs. + // No need to sort the txn updates here. Alpha would sort them before + // applying. + // Let's ensure that we have all the commits up until the max here. // Otherwise, we'll be sending commit timestamps out of order, which - // would cause Alphas to ignore them, during writes to Badger. - if len(delta.Txns) > 0 { - maxTs := delta.Txns[len(delta.Txns)-1].CommitTs - if o.doneUntil.DoneUntil() < maxTs { - // Don't send it yet. Keep on picking up more. - continue // the outer for loop. - } + // would cause Alphas to drop some of them, during writes to Badger. + waitFor := delta.MaxAssigned + for _, txn := range delta.Txns { + waitFor = x.Max(waitFor, txn.CommitTs) + } + if o.doneUntil.DoneUntil() < waitFor { + continue // The for loop doing blocking reads from o.updates. + // We need at least one entry from the updates channel to pick up a missing update. + // Don't goto slurp_loop, because it would break from select immediately. } + o.Lock() for id, ch := range o.subscribers { select { diff --git a/worker/groups.go b/worker/groups.go index 60f1e8a419e..09f8b808725 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -19,6 +19,7 @@ package worker import ( "fmt" "math" + "sort" "sync/atomic" "time" @@ -801,7 +802,13 @@ func (g *groupi) processOracleDeltaStream() { elog.Errorf("No longer the leader of group %d. Exiting", g.groupId()) return } - elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) + // We should always sort the txns before applying. Otherwise, we might lose some of + // these udpates, becuase we never write over a new version. + sort.Slice(delta.Txns, func(i, j int) bool { + return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs + }) + + glog.V(2).Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) for { // Block forever trying to propose this. err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) From f080f58f3ed5e1a9f7fc74f02d8df7bc82d418db Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 22 Oct 2018 16:22:52 -0700 Subject: [PATCH 6/6] Self review --- dgraph/cmd/root.go | 4 ++-- worker/draft.go | 2 +- worker/groups.go | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 0d3be153bf3..dc8997fa08b 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -79,8 +79,8 @@ func init() { flag.CommandLine.AddGoFlagSet(goflag.CommandLine) // Always set stderrthreshold=0. Don't let users set it themselves. x.Check(flag.Set("stderrthreshold", "0")) - flag.CommandLine.MarkDeprecated("stderrthreshold", - "Dgraph always sets this flag to 0. It can't be overwritten.") + x.Check(flag.CommandLine.MarkDeprecated("stderrthreshold", + "Dgraph always sets this flag to 0. It can't be overwritten.")) var subcommands = []*x.SubCommand{ &bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, diff --git a/worker/draft.go b/worker/draft.go index 9a3afe37be6..cb9f3e73330 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -454,7 +454,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { for _, status := range delta.Txns { if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs { - glog.Errorf("lastcommit %d > current %d. This would cause some commits to be lost.", + glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.", n.lastCommitTs, status.CommitTs) } toDisk(status.StartTs, status.CommitTs) diff --git a/worker/groups.go b/worker/groups.go index 09f8b808725..8336501528c 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -802,13 +802,16 @@ func (g *groupi) processOracleDeltaStream() { elog.Errorf("No longer the leader of group %d. Exiting", g.groupId()) return } + // We should always sort the txns before applying. Otherwise, we might lose some of - // these udpates, becuase we never write over a new version. + // these updates, becuase we never write over a new version. sort.Slice(delta.Txns, func(i, j int) bool { return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs }) - - glog.V(2).Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) + elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) + if glog.V(2) { + glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) + } for { // Block forever trying to propose this. err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta})