Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: Send commit timestamps in order #2687

Merged
merged 7 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ func init() {
RootCmd.PersistentFlags().Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")
rootConf.BindPFlags(RootCmd.PersistentFlags())

flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// Always set stderrthreshold=0. Don't let users set it themselves.
x.Check(flag.Set("stderrthreshold", "0"))
x.Check(flag.CommandLine.MarkDeprecated("stderrthreshold",
"Dgraph always sets this flag to 0. It can't be overwritten."))

var subcommands = []*x.SubCommand{
&bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero,
Expand Down
24 changes: 16 additions & 8 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package zero
import (
"errors"
"math/rand"
"sort"
"time"

"github.com/dgraph-io/dgo/protos/api"
Expand Down Expand Up @@ -116,12 +115,6 @@ func (o *Oracle) commit(src *api.TxnContext) error {
return nil
}

func sortTxns(delta *pb.OracleDelta) {
sort.Slice(delta.Txns, func(i, j int) bool {
return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
})
}

func (o *Oracle) currentState() *pb.OracleDelta {
o.AssertRLock()
resp := &pb.OracleDelta{}
Expand Down Expand Up @@ -175,7 +168,22 @@ func (o *Oracle) sendDeltasToSubscribers() {
break slurp_loop
}
}
sortTxns(delta) // Sort them in increasing order of CommitTs.
// No need to sort the txn updates here. Alpha would sort them before
// applying.

// Let's ensure that we have all the commits up until the max here.
// Otherwise, we'll be sending commit timestamps out of order, which
// would cause Alphas to drop some of them, during writes to Badger.
waitFor := delta.MaxAssigned
for _, txn := range delta.Txns {
waitFor = x.Max(waitFor, txn.CommitTs)
}
if o.doneUntil.DoneUntil() < waitFor {
continue // The for loop doing blocking reads from o.updates.
// We need at least one entry from the updates channel to pick up a missing update.
// Don't goto slurp_loop, because it would break from select immediately.
}

o.Lock()
for id, ch := range o.subscribers {
select {
Expand Down
8 changes: 8 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type node struct {
gid uint32
closer *y.Closer

lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing.

streaming int32 // Used to avoid calculating snapshot

canCampaign bool
Expand Down Expand Up @@ -449,8 +451,14 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
time.Sleep(10 * time.Millisecond)
}
}

for _, status := range delta.Txns {
if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs {
glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.",
n.lastCommitTs, status.CommitTs)
}
toDisk(status.StartTs, status.CommitTs)
n.lastCommitTs = status.CommitTs
}
if err := writer.Flush(); err != nil {
x.Errorf("Error while flushing to disk: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package worker
import (
"fmt"
"math"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -801,7 +802,16 @@ func (g *groupi) processOracleDeltaStream() {
elog.Errorf("No longer the leader of group %d. Exiting", g.groupId())
return
}

// We should always sort the txns before applying. Otherwise, we might lose some of
// these updates, becuase we never write over a new version.
sort.Slice(delta.Txns, func(i, j int) bool {
return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs
})
elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta)
if glog.V(2) {
glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta)
}
for {
// Block forever trying to propose this.
err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta})
Expand Down