Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't read data posting lists in certain cases #3713

Merged
merged 4 commits into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (txn *Txn) Get(key []byte) (*List, error) {
return txn.cache.Get(key)
}

// GetFromDelta retrieves the posting list from delta cache, not from Badger.
func (txn *Txn) GetFromDelta(key []byte) (*List, error) {
return txn.cache.GetFromDelta(key)
}

// Update calls UpdateDeltasAndDiscardLists on the local cache.
func (txn *Txn) Update() {
txn.cache.UpdateDeltasAndDiscardLists()
Expand Down
35 changes: 28 additions & 7 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,34 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e
return err
}

t := time.Now()
key := x.DataKey(edge.Attr, edge.Entity)
plist, err := txn.Get(key)
// The following is a performance optimization which allows us to not read a posting list from
// disk. We calculate this based on how AddMutationWithIndex works. The general idea is that if
// we're not using the read posting list, we don't need to retrieve it. We need the posting list
// if we're doing indexing or count index or enforcing single UID, etc. In other cases, we can
// just create a posting list facade in memory and use it to store the delta in Badger. Later,
// the rollup operation would consolidate all these deltas into a posting list.
var getFn func(key []byte) (*posting.List, error)
switch {
case len(su.GetTokenizer()) > 0 || su.GetCount():
// Any index or count index.
getFn = txn.Get
case su.GetValueType() == pb.Posting_UID && !su.GetList():
// Single UID, not a list.
getFn = txn.Get
case edge.Op == pb.DirectedEdge_DEL:
// Covers various delete cases to keep things simple.
getFn = txn.Get
default:
// Reverse index doesn't need the posting list to be read. We already covered count index,
// single uid and delete all above.
// Values, whether single or list, don't need to be read.
// Uid list doesn't need to be read.
getFn = txn.GetFromDelta
}

t := time.Now()
plist, err := getFn(key)
if dur := time.Since(t); dur > time.Millisecond {
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef([]otrace.Attribute{otrace.BoolAttribute("slow-get", true)},
Expand All @@ -87,11 +112,7 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e
if err != nil {
return err
}

if err = plist.AddMutationWithIndex(ctx, edge, txn); err != nil {
return err // abort applying the rest of them.
}
return nil
return plist.AddMutationWithIndex(ctx, edge, txn)
}

// This is serialized with mutations, called after applied watermarks catch up
Expand Down