diff --git a/dgraph/cmd/root.go b/dgraph/cmd/root.go index 234d6a45678..dc8997fa08b 100644 --- a/dgraph/cmd/root.go +++ b/dgraph/cmd/root.go @@ -75,7 +75,12 @@ func init() { RootCmd.PersistentFlags().Bool("expose_trace", false, "Allow trace endpoint to be accessible from remote") rootConf.BindPFlags(RootCmd.PersistentFlags()) + flag.CommandLine.AddGoFlagSet(goflag.CommandLine) + // Always set stderrthreshold=0. Don't let users set it themselves. + x.Check(flag.Set("stderrthreshold", "0")) + x.Check(flag.CommandLine.MarkDeprecated("stderrthreshold", + "Dgraph always sets this flag to 0. It can't be overwritten.")) var subcommands = []*x.SubCommand{ &bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 7a382b98874..bfd65e16594 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -19,7 +19,6 @@ package zero import ( "errors" "math/rand" - "sort" "time" "github.com/dgraph-io/dgo/protos/api" @@ -116,12 +115,6 @@ func (o *Oracle) commit(src *api.TxnContext) error { return nil } -func sortTxns(delta *pb.OracleDelta) { - sort.Slice(delta.Txns, func(i, j int) bool { - return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs - }) -} - func (o *Oracle) currentState() *pb.OracleDelta { o.AssertRLock() resp := &pb.OracleDelta{} @@ -175,7 +168,22 @@ func (o *Oracle) sendDeltasToSubscribers() { break slurp_loop } } - sortTxns(delta) // Sort them in increasing order of CommitTs. + // No need to sort the txn updates here. Alpha would sort them before + // applying. + + // Let's ensure that we have all the commits up until the max here. + // Otherwise, we'll be sending commit timestamps out of order, which + // would cause Alphas to drop some of them, during writes to Badger. + waitFor := delta.MaxAssigned + for _, txn := range delta.Txns { + waitFor = x.Max(waitFor, txn.CommitTs) + } + if o.doneUntil.DoneUntil() < waitFor { + continue // The for loop doing blocking reads from o.updates. + // We need at least one entry from the updates channel to pick up a missing update. + // Don't goto slurp_loop, because it would break from select immediately. + } + o.Lock() for id, ch := range o.subscribers { select { diff --git a/worker/draft.go b/worker/draft.go index 34c60187ca1..cb9f3e73330 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -59,6 +59,8 @@ type node struct { gid uint32 closer *y.Closer + lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing. + streaming int32 // Used to avoid calculating snapshot canCampaign bool @@ -449,8 +451,14 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error { time.Sleep(10 * time.Millisecond) } } + for _, status := range delta.Txns { + if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs { + glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.", + n.lastCommitTs, status.CommitTs) + } toDisk(status.StartTs, status.CommitTs) + n.lastCommitTs = status.CommitTs } if err := writer.Flush(); err != nil { x.Errorf("Error while flushing to disk: %v", err) diff --git a/worker/groups.go b/worker/groups.go index 60f1e8a419e..8336501528c 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -19,6 +19,7 @@ package worker import ( "fmt" "math" + "sort" "sync/atomic" "time" @@ -801,7 +802,16 @@ func (g *groupi) processOracleDeltaStream() { elog.Errorf("No longer the leader of group %d. Exiting", g.groupId()) return } + + // We should always sort the txns before applying. Otherwise, we might lose some of + // these updates, becuase we never write over a new version. + sort.Slice(delta.Txns, func(i, j int) bool { + return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs + }) elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) + if glog.V(2) { + glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) + } for { // Block forever trying to propose this. err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta})