Skip to content

Commit

Permalink
Introduce group checksums (#2964)
Browse files Browse the repository at this point in the history
This PR introduces group checksums, which get propagated along the OracleDeltas streaming from Zero -> Alpha Leader -> Alpha Followers. This allows an Alpha to block a read if the group checksum does not match the checksum in membership, which is being received independently directly from Zero.

This fixes the Jepsen issue where during a tablet move, an Alpha with an older membership status (thinking it is serving the group) can reply to a query with stale data. Now, the Alpha would see that the checksum is different between Oracle Deltas and Membership state, and would block until it gets an updated state. Thus, it would realize that it is no longer serving the tablet, and either return an error or shoot the query off to the right Alpha.

Proposal retry issue:

This PR also fixes another issue where a commit proposal to Zero Leader gets blocked when leader goes under partition but before it steps down. When the proposal times out and is internally retried, that Zero is no longer the leader, causing `n.AmLeader()` to fail and the commit to get rejected by Zero. This moves the needle of MaxAssigned, allowing a read to happen at Alpha. But, the proposal which had timed out, does get applied after -- causing a commit to happen at a lower timestamp than MaxAssigned.

This PR moves the `n.AmLeader` check to the top of the function, so when we are in the proposal try loop, we don't do this check. Instead, we'll keep trying to propose until we have a resolution from Raft.

Debugging:

This PR updates the `dgraph debug` tool to sum up the amounts, without looking at the keys in Jepsen. This is useful when keys are located separately from amounts in two different groups. This change is what allowed me to find the first txn violation, and the causes for tablet move nemesis.

Fixes #2321 .

Commits:
* Introduce Group checksums, so an Alpha can know if the composition of the tablets in group changed.
* Add a simple waiting loop for group checksums.
* Remove any background cleaning jobs.
* Always iterate to create a posting list.
* Have a way to find violation linearly.
* Make reading amounts and total without considering keys.
* Avoid a race cond between proposing a commit in a loop v/s rejecting that commit when Zero stops being a leader.
* Self review. Do not do any tablet removals or deletions for now.
  • Loading branch information
manishrjain authored Feb 1, 2019
1 parent 1e13e78 commit 63ecb11
Show file tree
Hide file tree
Showing 11 changed files with 549 additions and 328 deletions.
123 changes: 72 additions & 51 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type flagOptions struct {
readOnly bool
pdir string
itemMeta bool
jepsen bool
jepsen string
readTs uint64
sizeHistogram bool
noKeys bool
}

func init() {
Expand All @@ -66,7 +67,9 @@ func init() {
flag := Debug.Cmd.Flags()
flag.BoolVar(&opt.itemMeta, "item", true, "Output item meta as well. Set to false for diffs.")
flag.BoolVar(&opt.vals, "vals", false, "Output values along with keys.")
flag.BoolVar(&opt.jepsen, "jepsen", false, "Disect Jepsen output.")
flag.BoolVar(&opt.noKeys, "nokeys", false,
"Ignore key_. Only consider amount when calculating total.")
flag.StringVar(&opt.jepsen, "jepsen", "", "Disect Jepsen output. Can be linear/binary.")
flag.Uint64Var(&opt.readTs, "at", math.MaxUint64, "Set read timestamp for all txns.")
flag.BoolVarP(&opt.readOnly, "readonly", "o", true, "Open in read only mode.")
flag.StringVarP(&opt.predicate, "pred", "r", "", "Only output specified predicate.")
Expand All @@ -92,51 +95,7 @@ func toInt(o *pb.Posting) int {
return a
}

func readAmount(txn *badger.Txn, uid uint64) int {
iopt := badger.DefaultIteratorOptions
iopt.AllVersions = true
itr := txn.NewIterator(iopt)
defer itr.Close()

for itr.Rewind(); itr.Valid(); {
item := itr.Item()
pk := x.Parse(item.Key())
if !pk.IsData() || pk.IsSchema() || pk.Uid != uid || !strings.HasPrefix(pk.Attr, "amount_") {
itr.Next()
continue
}
pl, err := posting.ReadPostingList(item.KeyCopy(nil), itr)
if err != nil {
log.Fatalf("Unable to read posting list: %v", err)
}
var times int
var amount int
err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
amount = toInt(o)
times++
return nil
})
x.Check(err)
if times == 0 {
itr.Next()
continue
}
x.AssertTrue(times <= 1)
return amount
}
return 0
}

func seekTotal(db *badger.DB, readTs uint64) int {
txn := db.NewTransactionAt(readTs, false)
defer txn.Discard()

iopt := badger.DefaultIteratorOptions
iopt.AllVersions = true
iopt.PrefetchValues = false
itr := txn.NewIterator(iopt)
defer itr.Close()

func uidToVal(itr *badger.Iterator, prefix string) map[uint64]int {
keys := make(map[uint64]int)
var lastKey []byte
for itr.Rewind(); itr.Valid(); {
Expand All @@ -147,7 +106,7 @@ func seekTotal(db *badger.DB, readTs uint64) int {
}
lastKey = append(lastKey[:0], item.Key()...)
pk := x.Parse(item.Key())
if !pk.IsData() || !strings.HasPrefix(pk.Attr, "key_") {
if !pk.IsData() || !strings.HasPrefix(pk.Attr, prefix) {
continue
}
if pk.IsSchema() {
Expand Down Expand Up @@ -175,17 +134,65 @@ func seekTotal(db *badger.DB, readTs uint64) int {
})
x.Checkf(err, "during iterate")
}
return keys
}

func seekTotal(db *badger.DB, readTs uint64) int {
txn := db.NewTransactionAt(readTs, false)
defer txn.Discard()

iopt := badger.DefaultIteratorOptions
iopt.AllVersions = true
iopt.PrefetchValues = false
itr := txn.NewIterator(iopt)
defer itr.Close()

keys := uidToVal(itr, "key_")
fmt.Printf("Got keys: %+v\n", keys)
vals := uidToVal(itr, "amount_")
var total int
for _, val := range vals {
total += val
}
fmt.Printf("Got vals: %+v. Total: %d\n", vals, total)
if opt.noKeys {
// Ignore the key_ predicate. Only consider the amount_ predicate. Useful when tablets are
// being moved around.
keys = vals
}

total = 0
for uid, key := range keys {
a := readAmount(txn, uid)
a := vals[uid]
fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a)
total += a
}
fmt.Printf("Total @ %d = %d\n", readTs, total)
return total
}

func findFirstValidTxn(db *badger.DB) uint64 {
readTs := opt.readTs
var wrong uint64
for {
min, max := getMinMax(db, readTs-1)
if max <= min {
fmt.Printf("Can't find it. Max: %d\n", max)
return 0
}
readTs = max
if total := seekTotal(db, readTs); total != 100 {
fmt.Printf("===> VIOLATION at ts: %d\n", readTs)
showAllPostingsAt(db, readTs)
wrong = readTs
} else {
fmt.Printf("===> Found first correct version at %d\n", readTs)
showAllPostingsAt(db, readTs)
return wrong
}
}
}

func findFirstInvalidTxn(db *badger.DB, lowTs, highTs uint64) uint64 {
fmt.Println()
if highTs-lowTs < 1 {
Expand Down Expand Up @@ -296,7 +303,13 @@ func jepsen(db *badger.DB) {
min, max := getMinMax(db, opt.readTs)
fmt.Printf("min=%d. max=%d\n", min, max)

ts := findFirstInvalidTxn(db, min, max)
var ts uint64
switch opt.jepsen {
case "binary":
ts = findFirstInvalidTxn(db, min, max)
case "linear":
ts = findFirstValidTxn(db)
}
fmt.Println()
if ts == 0 {
fmt.Println("Nothing found. Exiting.")
Expand All @@ -315,6 +328,8 @@ func jepsen(db *badger.DB) {

func history(lookup []byte, itr *badger.Iterator) {
var buf bytes.Buffer
pk := x.Parse(lookup)
fmt.Fprintf(&buf, "==> key: %x. PK: %+v\n", lookup, pk)
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if !bytes.Equal(item.Key(), lookup) {
Expand Down Expand Up @@ -640,11 +655,17 @@ func run() {
x.Check(err)
defer db.Close()

min, max := getMinMax(db, opt.readTs)
fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs)

switch {
case len(opt.keyLookup) > 0:
lookup(db)
case opt.jepsen:
case len(opt.jepsen) > 0:
jepsen(db)
case opt.vals:
total := seekTotal(db, opt.readTs)
fmt.Printf("Total: %d\n", total)
case opt.sizeHistogram:
sizeHistogram(db)
default:
Expand Down
8 changes: 3 additions & 5 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,6 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
// violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
// cause newer txns to see older data.

// We could consider adding a wrapper around the user proposal, so we can access any key-values.
// Something like this:
// https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4
ctx = context.Background() // Use a new context with no timeout.

// If this node stops being the leader, we want this proposal to not be forwarded to the leader,
// and get aborted.
if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
Expand Down Expand Up @@ -443,6 +438,9 @@ func (s *Server) Oracle(unused *api.Payload, server pb.Zero_OracleServer) error
if !open {
return errClosed
}
// Pass in the latest group checksum as well, so the Alpha can use that to determine
// when not to service a read.
delta.GroupChecksums = s.groupChecksums()
if err := server.Send(delta); err != nil {
return err
}
Expand Down
47 changes: 35 additions & 12 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zero

import (
"bytes"
"errors"
"fmt"
"log"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/google/uuid"
"golang.org/x/net/context"
Expand Down Expand Up @@ -75,18 +77,29 @@ func (n *node) uniqueKey() string {
var errInternalRetry = errors.New("Retry Raft proposal internally")

func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) error {
if n.Raft() == nil {
switch {
case n.Raft() == nil:
return x.Errorf("Raft isn't initialized yet.")
}
if ctx.Err() != nil {
case ctx.Err() != nil:
return ctx.Err()
case !n.AmLeader():
// Do this check upfront. Don't do this inside propose for reasons explained below.
return x.Errorf("Not Zero leader. Aborting proposal: %+v", proposal)
}
span := otrace.FromContext(ctx)

// We could consider adding a wrapper around the user proposal, so we can access any key-values.
// Something like this:
// https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4
span := otrace.FromContext(ctx)
// Overwrite ctx, so we no longer enforce the timeouts or cancels from ctx.
ctx = otrace.NewContext(context.Background(), span)

// propose runs in a loop. So, we should not do any checks inside, including n.AmLeader. This is
// to avoid the scenario where the first proposal times out and the second one gets returned
// due to node no longer being the leader. In this scenario, the first proposal can still get
// accepted by Raft, causing a txn violation later for us, because we assumed that the proposal
// did not go through.
propose := func(timeout time.Duration) error {
if !n.AmLeader() {
return x.Errorf("Not Zero leader. Aborting proposal: %+v", proposal)
}
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

Expand All @@ -100,16 +113,15 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
x.AssertTruef(n.Proposals.Store(key, pctx), "Found existing proposal with key: [%v]", key)
defer n.Proposals.Delete(key)
proposal.Key = key
if span != nil {
span.Annotatef(nil, "Proposing with key: %s. Timeout: %v", key, timeout)
}
span.Annotatef(nil, "Proposing with key: %s. Timeout: %v", key, timeout)

data, err := proposal.Marshal()
if err != nil {
return err
}
// Propose the change.
if err := n.Raft().Propose(cctx, data); err != nil {
span.Annotatef(nil, "Error while proposing via Raft: %v", err)
return x.Wrapf(err, "While proposing")
}

Expand All @@ -118,9 +130,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
case err := <-che:
// We arrived here by a call to n.props.Done().
return err
case <-ctx.Done():
return ctx.Err()
case <-cctx.Done():
span.Annotatef(nil, "Internal context timeout %s. Will retry...", timeout)
return errInternalRetry
}
}
Expand Down Expand Up @@ -226,6 +237,18 @@ func (n *node) handleMemberProposal(member *pb.Member) error {
func (n *node) handleTabletProposal(tablet *pb.Tablet) error {
n.server.AssertLock()
state := n.server.state
defer func() {
// Regenerate group checksums. These checksums are solely based on which tablets are being
// served by the group. If the tablets that a group is serving changes, and the Alpha does
// not know about these changes, then the read request must fail.
for _, g := range state.GetGroups() {
var buf bytes.Buffer
for name := range g.GetTablets() {
x.Check2(buf.WriteString(name))
}
g.Checksum = farm.Fingerprint64(buf.Bytes())
}
}()

if tablet.GroupId == 0 {
return x.Errorf("Tablet group id is zero: %+v", tablet)
Expand Down
10 changes: 10 additions & 0 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ func (s *Server) membershipState() *pb.MembershipState {
return proto.Clone(s.state).(*pb.MembershipState)
}

func (s *Server) groupChecksums() map[uint32]uint64 {
s.RLock()
defer s.RUnlock()
m := make(map[uint32]uint64)
for gid, g := range s.state.GetGroups() {
m[gid] = g.Checksum
}
return m
}

func (s *Server) storeZero(m *pb.Member) {
s.Lock()
defer s.Unlock()
Expand Down
46 changes: 9 additions & 37 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,44 +232,16 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {

// TODO: We should only create a posting list with a specific readTs.
func getNew(key []byte, pstore *badger.DB) (*List, error) {
l := new(List)
l.key = key
l.mutationMap = make(map[uint64]*pb.PostingList)
l.plist = new(pb.PostingList)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()

item, err := txn.Get(key)
if err == badger.ErrKeyNotFound {
return l, nil
}
if err != nil {
return l, err
}
if item.UserMeta()&BitEmptyPosting > 0 {
l.minTs = item.Version()

} else if item.UserMeta()&BitCompletePosting > 0 {
err = unmarshalOrCopy(l.plist, item)
l.minTs = item.Version()

} else {
iterOpts := badger.DefaultIteratorOptions
iterOpts.AllVersions = true
iterOpts.PrefetchValues = false
it := txn.NewKeyIterator(key, iterOpts)
defer it.Close()
it.Seek(key)
l, err = ReadPostingList(key, it)
}
if err != nil {
return l, err
}

l.Lock()
size := l.calculateSize()
l.Unlock()
x.BytesRead.Add(int64(size))
atomic.StoreInt32(&l.estimatedSize, size)
return l, nil
// When we do rollups, an older version would go to the top of the LSM tree, which can cause
// issues during txn.Get. Therefore, always iterate.
iterOpts := badger.DefaultIteratorOptions
iterOpts.AllVersions = true
iterOpts.PrefetchValues = false
itr := txn.NewKeyIterator(key, iterOpts)
defer itr.Close()
itr.Seek(key)
return ReadPostingList(key, itr)
}
Loading

0 comments on commit 63ecb11

Please sign in to comment.