Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete tablets which don't belong #3051

Merged
merged 8 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (s *Server) rebalanceTablets() {
// for the entire duration of predicate move. If this Zero stops being the leader, the final
// proposal of reassigning the tablet to the destination would fail automatically.
func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) error {
s.moveOngoing <- struct{}{}
defer func() {
<-s.moveOngoing
}()

ctx, cancel := context.WithTimeout(context.Background(), predicateMoveTimeout)
defer cancel()

Expand Down Expand Up @@ -146,6 +151,19 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
predicate, srcGroup, dstGroup)
glog.Info(msg)
span.Annotate(nil, msg)

// Now that the move has happened, we can delete the predicate from the source group.
in.DestGid = 0 // Indicates deletion of predicate in the source group.
if _, err := wc.MovePredicate(ctx, in); err != nil {
msg = fmt.Sprintf("While deleting predicate [%v] in group %d. Error: %v",
in.Predicate, in.SourceGid, err)
span.Annotate(nil, msg)
glog.Warningf(msg)
} else {
msg = fmt.Sprintf("Deleted predicate %v in group %d", in.Predicate, in.SourceGid)
span.Annotate(nil, msg)
glog.V(1).Infof(msg)
}
return nil
}

Expand Down
64 changes: 64 additions & 0 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Server struct {
closer *y.Closer // Used to tell stream to close.
connectLock sync.Mutex // Used to serialize connect requests from servers.

moveOngoing chan struct{}
blockCommitsOn *sync.Map
}

Expand All @@ -79,6 +80,7 @@ func (s *Server) Init() {
s.leaderChangeCh = make(chan struct{}, 1)
s.closer = y.NewCloser(2) // grpc and http
s.blockCommitsOn = new(sync.Map)
s.moveOngoing = make(chan struct{}, 1)
go s.rebalanceTablets()
}

Expand Down Expand Up @@ -596,9 +598,71 @@ func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Pa
return nil, err
}
}

if len(group.Members) == 0 {
return &api.Payload{Data: []byte("OK")}, nil
}
select {
case s.moveOngoing <- struct{}{}:
default:
// If a move is going on, don't do the next steps of deleting predicates.
return &api.Payload{Data: []byte("OK")}, nil
}
defer func() {
<-s.moveOngoing
}()

if err := s.deletePredicates(ctx, group); err != nil {
glog.Warningf("While deleting predicates: %v", err)
}
return &api.Payload{Data: []byte("OK")}, nil
}

func (s *Server) deletePredicates(ctx context.Context, group *pb.Group) error {
if group == nil || group.Tablets == nil {
return nil
}
var gid uint32
for _, tablet := range group.Tablets {
gid = tablet.GroupId
break
}
if gid == 0 {
return x.Errorf("Unable to find group")
}
state, err := s.latestMembershipState(ctx)
if err != nil {
return err
}
sg, ok := state.Groups[gid]
if !ok {
return x.Errorf("Unable to find group: %d", gid)
}

pl := s.Leader(gid)
if pl == nil {
return x.Errorf("Unable to reach leader of group: %d", gid)
}
wc := pb.NewWorkerClient(pl.Get())

for pred := range group.Tablets {
if _, found := sg.Tablets[pred]; found {
continue
}
glog.Infof("Tablet: %v does not belong to group: %d. Sending delete instruction.",
pred, gid)
in := &pb.MovePredicatePayload{
Predicate: pred,
SourceGid: gid,
DestGid: 0,
}
if _, err := wc.MovePredicate(ctx, in); err != nil {
return err
}
}
return nil
}

func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershipServer) error {
// Send MembershipState right away. So, the connection is correctly established.
ctx := stream.Context()
Expand Down
60 changes: 60 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"fmt"
"math"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
humanize "github.com/dustin/go-humanize"

ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
Expand Down Expand Up @@ -834,11 +836,34 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
m := new(sync.Map)

addTo := func(key []byte, delta int64) {
if !amLeader {
// Only leader needs to calculate the tablet sizes.
return
}
pk := x.Parse(key)
if pk == nil {
return
}
val, ok := m.Load(pk.Attr)
if !ok {
sz := new(int64)
val, _ = m.LoadOrStore(pk.Attr, sz)
}
size := val.(*int64)
atomic.AddInt64(size, delta)
}

stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Rolling up"
stream.ChooseKey = func(item *badger.Item) bool {
switch item.UserMeta() {
case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting:
addTo(item.Key(), item.EstimatedSize())
return false
default:
return true
Expand All @@ -852,6 +877,7 @@ func (n *node) rollupLists(readTs uint64) error {
}
atomic.AddUint64(&numKeys, 1)
kv, err := l.MarshalToKv()
addTo(key, int64(kv.Size()))
return listWrap(kv), err
}
stream.Send = func(list *bpb.KVList) error {
Expand All @@ -868,6 +894,40 @@ func (n *node) rollupLists(readTs uint64) error {

// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(readTs)

if amLeader {
// Only leader sends the tablet size updates to Zero. No one else does.
// doSendMembership is also being concurrently called from another goroutine.
go func() {
tablets := make(map[string]*pb.Tablet)
var total int64
m.Range(func(key, val interface{}) bool {
pred := key.(string)
size := atomic.LoadInt64(val.(*int64))
tablets[pred] = &pb.Tablet{
GroupId: n.gid,
Predicate: pred,
Space: size,
}
total += size
return true
})
// Update Zero with the tablet sizes. If Zero sees a tablet which does not belong to
// this group, it would send instruction to delete that tablet. There's an edge case
// here if the followers are still running Rollup, and happen to read a key before and
// write after the tablet deletion, causing that tablet key to resurface. Then, only the
// follower would have that key, not the leader.
// However, if the follower then becomes the leader, we'd be able to get rid of that
// key then. Alternatively, we could look into cancelling the Rollup if we see a
// predicate deletion.
if err := groups().doSendMembership(tablets); err != nil {
glog.Warningf("While sending membership to Zero. Error: %v", err)
} else {
glog.V(2).Infof("Sent tablet size update to Zero. Total size: %s",
humanize.Bytes(uint64(total)))
}
}()
}
return nil
}

Expand Down
Loading