Skip to content

Commit

Permalink
Fix various deadlocks in Dgraph (dgraph-io#2548)
Browse files Browse the repository at this point in the history
Fixed a bunch of long-standing deadlock issues:

1. Deadlock caused by recursive locking in posting/list.go in an internal function, which was causing `applyCh` to block when applying a mutation on a posting list with a read from a query.
2. Deadlock caused by loss of Raft ConfState during a restart of a node. We were not picking up the previous ConfState, hence it was set by default to nil in the next CreateSnapshot. Now we pick up the state, and ensure that the snapshot has a valid ConfState. This basically caused a node to see an empty group, and never participate in elections.
3. Fix dgraph-io#2541 -- A Tick missed to fire, caused due to the repeated calling of `raft Storage.FirstIndex()`. This was causing Badger to create an iterator every time, which was expensive. Now we cache the first index, to avoid repeatedly looking it up.

Also introduced golang/glog library for better logging.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 216279b commit ebf0e63
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 22 deletions.
22 changes: 12 additions & 10 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -124,9 +125,9 @@ func (n *Node) Raft() raft.Node {

// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
glog.Infof("Setting conf state to %+v\n", cs)
n.Lock()
defer n.Unlock()
x.Printf("Setting conf state to %+v\n", cs)
n._confState = cs
}

Expand Down Expand Up @@ -215,7 +216,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
return
}
if !raft.IsEmptySnap(sp) {
x.Printf("Found Snapshot, Metadata: %+v\n", sp.Metadata)
glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
restart = true
idx = sp.Metadata.Index
}
Expand All @@ -226,7 +227,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
return
}
if !raft.IsEmptyHardState(hd) {
x.Printf("Found hardstate: %+v\n", hd)
glog.Infof("Found hardstate: %+v\n", hd)
restart = true
}

Expand All @@ -235,7 +236,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
if rerr != nil {
return
}
x.Printf("Group %d found %d entries\n", n.RaftContext.Group, num)
glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
// We'll always have at least one entry.
if num > 1 {
restart = true
Expand Down Expand Up @@ -292,7 +293,7 @@ func (n *Node) BatchAndSendMessages() {
if exists := failedConn[to]; !exists {
// So that we print error only the first time we are not able to connect.
// Otherwise, the log is polluted with multiple errors.
x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n",
glog.Warningf("No healthy connection to node Id: %d addr: [%s], err: %v\n",
to, addr, err)
failedConn[to] = true
}
Expand Down Expand Up @@ -325,7 +326,8 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) {
go func() {
_, err := c.RaftMessage(ctx, batch)
if err != nil {
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
glog.Warningf("Error while sending message to node with addr: %s, err: %v\n",
pool.Addr, err)
}
ch <- err
}()
Expand Down Expand Up @@ -356,7 +358,7 @@ func (n *Node) Connect(pid uint64, addr string) {
// a nil *pool.
if addr == n.MyAddr {
// TODO: Note this fact in more general peer health info somehow.
x.Printf("Peer %d claims same host as me\n", pid)
glog.Infof("Peer %d claims same host as me\n", pid)
n.SetPeer(pid, addr)
return
}
Expand Down Expand Up @@ -387,7 +389,7 @@ func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) erro
if cctx.Err() != nil {
return errInternalRetry
}
x.Printf("Error while proposing conf change: %v", err)
glog.Warningf("Error while proposing conf change: %v", err)
return err
}
select {
Expand Down Expand Up @@ -419,8 +421,8 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
}
err = errInternalRetry
for err == errInternalRetry {
x.Printf("Trying to add %d to cluster. Addr: %v\n", pid, addr)
x.Printf("Current confstate at %d: %+v\n", n.Id, n.ConfState())
glog.Infof("Trying to add %d to cluster. Addr: %v\n", pid, addr)
glog.Infof("Current confstate at %d: %+v\n", n.Id, n.ConfState())
err = n.proposeConfChange(ctx, cc)
}
return err
Expand Down
11 changes: 8 additions & 3 deletions contrib/integration/acctupsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"encoding/json"
"flag"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
Expand All @@ -33,6 +34,7 @@ var (
firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"}
lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"}
ages = []int{20, 25, 30, 35}
types = []string{"CEO", "COO", "CTO", "CFO"}
)

type account struct {
Expand Down Expand Up @@ -139,6 +141,7 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
{
get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) {
uid
expand(_all_) {uid}
}
}
`, acc.first, acc.last, acc.age)
Expand All @@ -153,6 +156,8 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
x.Check(json.Unmarshal(resp.GetJson(), &decode))

x.AssertTrue(len(decode.Get) <= 1)
t := rand.Intn(len(types))

var uid string
if len(decode.Get) == 1 {
x.AssertTrue(decode.Get[0].Uid != nil)
Expand All @@ -162,8 +167,9 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
_:acct <first> %q .
_:acct <last> %q .
_:acct <age> "%d"^^<xs:int> .
`,
acc.first, acc.last, acc.age,
_:acct <%s> "" .
`,
acc.first, acc.last, acc.age, types[t],
)
mu := &api.Mutation{SetNquads: []byte(nqs)}
assigned, err := txn.Mutate(ctx, mu)
Expand All @@ -172,7 +178,6 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
}
uid = assigned.GetUids()["acct"]
x.AssertTrue(uid != "")

}

nq := fmt.Sprintf(`
Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package cmd

import (
goflag "flag"
"fmt"
"os"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
)

Expand All @@ -40,6 +42,7 @@ cluster.
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
goflag.Parse()
if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -61,6 +64,7 @@ func init() {
RootCmd.PersistentFlags().Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")
rootConf.BindPFlags(RootCmd.PersistentFlags())
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)

var subcommands = []*x.SubCommand{
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug,
Expand Down
8 changes: 8 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -235,6 +236,10 @@ func (s *ServerState) getTimestamp() uint64 {
}

func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
if glog.V(2) {
glog.Infof("Received ALTER op: %+v", op)
defer glog.Infof("ALTER op: %+v done", op)
}
if op.Schema == "" && op.DropAttr == "" && !op.DropAll {
// Must have at least one field set. This helps users if they attempt
// to set a field but use the wrong name (could be decoded from JSON).
Expand Down Expand Up @@ -397,6 +402,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
// This method is used to execute the query and return the response to the
// client as a protocol buffer message.
func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
if glog.V(3) {
glog.Infof("Got a query: %+v", req)
}
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
Expand Down
5 changes: 3 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) {
// If list consists of one or more languages, first available value is returned; if no language
// from list match the values, processing is the same as for empty list.
func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) {
l.RLock() // All public methods should acquire locks, while private ones should assert them.
defer l.RUnlock()
p, err := l.postingFor(readTs, langs)
if err != nil {
return rval, err
Expand All @@ -922,8 +924,7 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err
}

func (l *List) postingFor(readTs uint64, langs []string) (p *intern.Posting, rerr error) {
l.RLock()
defer l.RUnlock()
l.AssertRLock() // Avoid recursive locking by asserting a lock here.
return l.postingForLangs(readTs, langs)
}

Expand Down
33 changes: 29 additions & 4 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/coreos/etcd/raft"
pb "github.com/coreos/etcd/raft/raftpb"
"github.com/dgraph-io/badger"
"github.com/golang/glog"
"golang.org/x/net/trace"

"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -65,13 +66,27 @@ func (u *txnUnifier) Cancel() {

type localCache struct {
sync.RWMutex
snap pb.Snapshot
firstIndex uint64
snap pb.Snapshot
}

func (c *localCache) setFirst(first uint64) {
c.Lock()
defer c.Unlock()
c.firstIndex = first
}

func (c *localCache) first() uint64 {
c.RLock()
defer c.RUnlock()
return c.firstIndex
}

func (c *localCache) setSnapshot(s pb.Snapshot) {
c.Lock()
defer c.Unlock()
c.snap = s
c.firstIndex = 0 // Reset firstIndex.
}

func (c *localCache) snapshot() pb.Snapshot {
Expand Down Expand Up @@ -240,7 +255,16 @@ func (w *DiskStorage) FirstIndex() (uint64, error) {
if !raft.IsEmptySnap(snap) {
return snap.Metadata.Index + 1, nil
}
if first := w.cache.first(); first > 0 {
return first, nil
}
index, err := w.seekEntry(nil, 0, false)
if err == nil {
glog.V(2).Infof("Setting first index: %d", index+1)
w.cache.setFirst(index + 1)
} else {
glog.Errorf("While seekEntry. Error: %v", err)
}
return index + 1, err
}

Expand Down Expand Up @@ -549,11 +573,13 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
}

func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
first, err := w.FirstIndex()
if err != nil {
return err
}
if i < first {
glog.Errorf("i=%d<first=%d, ErrSnapOutOfDate", i, first)
return raft.ErrSnapOutOfDate
}

Expand All @@ -568,9 +594,8 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
var snap pb.Snapshot
snap.Metadata.Index = i
snap.Metadata.Term = e.Term
if cs != nil {
snap.Metadata.ConfState = *cs
}
x.AssertTrue(cs != nil)
snap.Metadata.ConfState = *cs
snap.Data = data

u := w.newUnifier()
Expand Down
28 changes: 25 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/golang/glog"
"golang.org/x/net/context"
"golang.org/x/net/trace"

Expand Down Expand Up @@ -123,6 +124,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er
if n.Raft() == nil {
return x.Errorf("Raft isn't initialized yet")
}

// TODO: Should be based on number of edges (amount of work)
pendingProposals <- struct{}{}
x.PendingProposals.Add(1)
Expand Down Expand Up @@ -406,12 +408,23 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
return n.commitOrAbort(proposal.Key, proposal.Delta)

} else if proposal.Snapshot != nil {
existing, err := n.Store.Snapshot()
if err != nil {
return err
}
snap := proposal.Snapshot
if existing.Metadata.Index >= snap.Index {
log := fmt.Sprintf("Skipping snapshot at %d, because found one at %d",
snap.Index, existing.Metadata.Index)
n.elog.Printf(log)
glog.Info(log)
return nil
}
n.elog.Printf("Creating snapshot: %+v", snap)
x.Printf("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
data, err := snap.Marshal()
x.Check(err)
// We can now discard all invalid versions of keys below this ts.
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(snap.ReadTs)
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)

Expand Down Expand Up @@ -571,6 +584,7 @@ func (n *node) Run() {
done := make(chan struct{})
go func() {
<-n.closer.HasBeenClosed()
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
Expand Down Expand Up @@ -950,6 +964,13 @@ func (n *node) InitAndStartNode() {
sp, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
if !raft.IsEmptySnap(sp) {
// It is important that we pick up the conf state here.
// Otherwise, we'll lose the store conf state, and it would get
// overwritten with an empty state when a new snapshot is taken.
// This causes a node to just hang on restart, because it finds a
// zero-member Raft group.
n.SetConfState(&sp.Metadata.ConfState)

members := groups().members(n.gid)
for _, id := range sp.Metadata.ConfState.Nodes {
m, ok := members[id]
Expand All @@ -959,8 +980,9 @@ func (n *node) InitAndStartNode() {
}
}
n.SetRaft(raft.RestartNode(n.Cfg))
glog.V(2).Infoln("Restart node complete")
} else {
x.Printf("New Node for group: %d\n", n.gid)
glog.Infof("New Node for group: %d\n", n.gid)
if _, hasPeer := groups().MyPeer(); hasPeer {
// Get snapshot before joining peers as it can take time to retrieve it and we dont
// want the quorum to be inactive when it happens.
Expand Down
1 change: 1 addition & 0 deletions x/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

// SafeMutex can be used in place of sync.RWMutex
type SafeMutex struct {
// m deadlock.RWMutex // Very useful for detecting locking issues.
m sync.RWMutex
wait *SafeWait
writer int32
Expand Down

0 comments on commit ebf0e63

Please sign in to comment.