Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various deadlocks in Dgraph #2548

Merged
merged 7 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -124,9 +125,9 @@ func (n *Node) Raft() raft.Node {

// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
glog.Infof("Setting conf state to %+v\n", cs)
n.Lock()
defer n.Unlock()
x.Printf("Setting conf state to %+v\n", cs)
n._confState = cs
}

Expand Down Expand Up @@ -215,7 +216,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
return
}
if !raft.IsEmptySnap(sp) {
x.Printf("Found Snapshot, Metadata: %+v\n", sp.Metadata)
glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
restart = true
idx = sp.Metadata.Index
}
Expand All @@ -226,7 +227,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
return
}
if !raft.IsEmptyHardState(hd) {
x.Printf("Found hardstate: %+v\n", hd)
glog.Infof("Found hardstate: %+v\n", hd)
restart = true
}

Expand All @@ -235,7 +236,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
if rerr != nil {
return
}
x.Printf("Group %d found %d entries\n", n.RaftContext.Group, num)
glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
// We'll always have at least one entry.
if num > 1 {
restart = true
Expand Down Expand Up @@ -292,7 +293,7 @@ func (n *Node) BatchAndSendMessages() {
if exists := failedConn[to]; !exists {
// So that we print error only the first time we are not able to connect.
// Otherwise, the log is polluted with multiple errors.
x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n",
glog.Warningf("No healthy connection to node Id: %d addr: [%s], err: %v\n",
to, addr, err)
failedConn[to] = true
}
Expand Down Expand Up @@ -325,7 +326,8 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) {
go func() {
_, err := c.RaftMessage(ctx, batch)
if err != nil {
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
glog.Warningf("Error while sending message to node with addr: %s, err: %v\n",
pool.Addr, err)
}
ch <- err
}()
Expand Down Expand Up @@ -356,7 +358,7 @@ func (n *Node) Connect(pid uint64, addr string) {
// a nil *pool.
if addr == n.MyAddr {
// TODO: Note this fact in more general peer health info somehow.
x.Printf("Peer %d claims same host as me\n", pid)
glog.Infof("Peer %d claims same host as me\n", pid)
n.SetPeer(pid, addr)
return
}
Expand Down Expand Up @@ -387,7 +389,7 @@ func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) erro
if cctx.Err() != nil {
return errInternalRetry
}
x.Printf("Error while proposing conf change: %v", err)
glog.Warningf("Error while proposing conf change: %v", err)
return err
}
select {
Expand Down Expand Up @@ -419,8 +421,8 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
}
err = errInternalRetry
for err == errInternalRetry {
x.Printf("Trying to add %d to cluster. Addr: %v\n", pid, addr)
x.Printf("Current confstate at %d: %+v\n", n.Id, n.ConfState())
glog.Infof("Trying to add %d to cluster. Addr: %v\n", pid, addr)
glog.Infof("Current confstate at %d: %+v\n", n.Id, n.ConfState())
err = n.proposeConfChange(ctx, cc)
}
return err
Expand Down
11 changes: 8 additions & 3 deletions contrib/integration/acctupsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"encoding/json"
"flag"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
Expand All @@ -33,6 +34,7 @@ var (
firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"}
lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"}
ages = []int{20, 25, 30, 35}
types = []string{"CEO", "COO", "CTO", "CFO"}
)

type account struct {
Expand Down Expand Up @@ -139,6 +141,7 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
{
get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) {
uid
expand(_all_) {uid}
}
}
`, acc.first, acc.last, acc.age)
Expand All @@ -153,6 +156,8 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
x.Check(json.Unmarshal(resp.GetJson(), &decode))

x.AssertTrue(len(decode.Get) <= 1)
t := rand.Intn(len(types))

var uid string
if len(decode.Get) == 1 {
x.AssertTrue(decode.Get[0].Uid != nil)
Expand All @@ -162,8 +167,9 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
_:acct <first> %q .
_:acct <last> %q .
_:acct <age> "%d"^^<xs:int> .
`,
acc.first, acc.last, acc.age,
_:acct <%s> "" .
`,
acc.first, acc.last, acc.age, types[t],
)
mu := &api.Mutation{SetNquads: []byte(nqs)}
assigned, err := txn.Mutate(ctx, mu)
Expand All @@ -172,7 +178,6 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
}
uid = assigned.GetUids()["acct"]
x.AssertTrue(uid != "")

}

nq := fmt.Sprintf(`
Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package cmd

import (
goflag "flag"
"fmt"
"os"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
)

Expand All @@ -40,6 +42,7 @@ cluster.
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
goflag.Parse()
if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -61,6 +64,7 @@ func init() {
RootCmd.PersistentFlags().Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")
rootConf.BindPFlags(RootCmd.PersistentFlags())
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)

var subcommands = []*x.SubCommand{
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug,
Expand Down
8 changes: 8 additions & 0 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -235,6 +236,10 @@ func (s *ServerState) getTimestamp() uint64 {
}

func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
if glog.V(2) {
glog.Infof("Received ALTER op: %+v", op)
defer glog.Infof("ALTER op: %+v done", op)
}
if op.Schema == "" && op.DropAttr == "" && !op.DropAll {
// Must have at least one field set. This helps users if they attempt
// to set a field but use the wrong name (could be decoded from JSON).
Expand Down Expand Up @@ -397,6 +402,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
// This method is used to execute the query and return the response to the
// client as a protocol buffer message.
func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
if glog.V(3) {
glog.Infof("Got a query: %+v", req)
}
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
Expand Down
5 changes: 3 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) {
// If list consists of one or more languages, first available value is returned; if no language
// from list match the values, processing is the same as for empty list.
func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) {
l.RLock() // All public methods should acquire locks, while private ones should assert them.
defer l.RUnlock()
p, err := l.postingFor(readTs, langs)
if err != nil {
return rval, err
Expand All @@ -922,8 +924,7 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err
}

func (l *List) postingFor(readTs uint64, langs []string) (p *intern.Posting, rerr error) {
l.RLock()
defer l.RUnlock()
l.AssertRLock() // Avoid recursive locking by asserting a lock here.
return l.postingForLangs(readTs, langs)
}

Expand Down
33 changes: 29 additions & 4 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/coreos/etcd/raft"
pb "github.com/coreos/etcd/raft/raftpb"
"github.com/dgraph-io/badger"
"github.com/golang/glog"
"golang.org/x/net/trace"

"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -65,13 +66,27 @@ func (u *txnUnifier) Cancel() {

type localCache struct {
sync.RWMutex
snap pb.Snapshot
firstIndex uint64
snap pb.Snapshot
}

func (c *localCache) setFirst(first uint64) {
c.Lock()
defer c.Unlock()
c.firstIndex = first
}

func (c *localCache) first() uint64 {
c.RLock()
defer c.RUnlock()
return c.firstIndex
}

func (c *localCache) setSnapshot(s pb.Snapshot) {
c.Lock()
defer c.Unlock()
c.snap = s
c.firstIndex = 0 // Reset firstIndex.
}

func (c *localCache) snapshot() pb.Snapshot {
Expand Down Expand Up @@ -240,7 +255,16 @@ func (w *DiskStorage) FirstIndex() (uint64, error) {
if !raft.IsEmptySnap(snap) {
return snap.Metadata.Index + 1, nil
}
if first := w.cache.first(); first > 0 {
return first, nil
}
index, err := w.seekEntry(nil, 0, false)
if err == nil {
glog.V(2).Infof("Setting first index: %d", index+1)
w.cache.setFirst(index + 1)
} else {
glog.Errorf("While seekEntry. Error: %v", err)
}
return index + 1, err
}

Expand Down Expand Up @@ -549,11 +573,13 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
}

func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
first, err := w.FirstIndex()
if err != nil {
return err
}
if i < first {
glog.Errorf("i=%d<first=%d, ErrSnapOutOfDate", i, first)
return raft.ErrSnapOutOfDate
}

Expand All @@ -568,9 +594,8 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
var snap pb.Snapshot
snap.Metadata.Index = i
snap.Metadata.Term = e.Term
if cs != nil {
snap.Metadata.ConfState = *cs
}
x.AssertTrue(cs != nil)
snap.Metadata.ConfState = *cs
snap.Data = data

u := w.newUnifier()
Expand Down
28 changes: 25 additions & 3 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/golang/glog"
"golang.org/x/net/context"
"golang.org/x/net/trace"

Expand Down Expand Up @@ -123,6 +124,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er
if n.Raft() == nil {
return x.Errorf("Raft isn't initialized yet")
}

// TODO: Should be based on number of edges (amount of work)
pendingProposals <- struct{}{}
x.PendingProposals.Add(1)
Expand Down Expand Up @@ -406,12 +408,23 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
return n.commitOrAbort(proposal.Key, proposal.Delta)

} else if proposal.Snapshot != nil {
existing, err := n.Store.Snapshot()
if err != nil {
return err
}
snap := proposal.Snapshot
if existing.Metadata.Index >= snap.Index {
log := fmt.Sprintf("Skipping snapshot at %d, because found one at %d",
snap.Index, existing.Metadata.Index)
n.elog.Printf(log)
glog.Info(log)
return nil
}
n.elog.Printf("Creating snapshot: %+v", snap)
x.Printf("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
data, err := snap.Marshal()
x.Check(err)
// We can now discard all invalid versions of keys below this ts.
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(snap.ReadTs)
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)

Expand Down Expand Up @@ -571,6 +584,7 @@ func (n *node) Run() {
done := make(chan struct{})
go func() {
<-n.closer.HasBeenClosed()
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
Expand Down Expand Up @@ -950,6 +964,13 @@ func (n *node) InitAndStartNode() {
sp, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
if !raft.IsEmptySnap(sp) {
// It is important that we pick up the conf state here.
// Otherwise, we'll lose the store conf state, and it would get
// overwritten with an empty state when a new snapshot is taken.
// This causes a node to just hang on restart, because it finds a
// zero-member Raft group.
n.SetConfState(&sp.Metadata.ConfState)

members := groups().members(n.gid)
for _, id := range sp.Metadata.ConfState.Nodes {
m, ok := members[id]
Expand All @@ -959,8 +980,9 @@ func (n *node) InitAndStartNode() {
}
}
n.SetRaft(raft.RestartNode(n.Cfg))
glog.V(2).Infoln("Restart node complete")
} else {
x.Printf("New Node for group: %d\n", n.gid)
glog.Infof("New Node for group: %d\n", n.gid)
if _, hasPeer := groups().MyPeer(); hasPeer {
// Get snapshot before joining peers as it can take time to retrieve it and we dont
// want the quorum to be inactive when it happens.
Expand Down
1 change: 1 addition & 0 deletions x/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

// SafeMutex can be used in place of sync.RWMutex
type SafeMutex struct {
// m deadlock.RWMutex // Very useful for detecting locking issues.
m sync.RWMutex
wait *SafeWait
writer int32
Expand Down