Skip to content

Commit

Permalink
feat(core): add cache to dgraph.type predicate. (#9068)
Browse files Browse the repository at this point in the history
Add cache to dgraph.type predicate. This would significantly help
improve @filter(type) queries.
Live loader before took: 16mins 39 seconds for 21 million dataset.
Live loader now: 10mins 52 seconds.
Improvement: 38%
  • Loading branch information
harshil-goel authored May 23, 2024
1 parent fcc05d5 commit d5d9373
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 123 deletions.
5 changes: 5 additions & 0 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,11 +1511,13 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
ResetCache()
return pstore.DropAll()
}

// DeleteData deletes all data for the namespace but leaves types and schema intact.
func DeleteData(ns uint64) error {
ResetCache()
prefix := make([]byte, 9)
prefix[0] = x.DefaultPrefix
binary.BigEndian.PutUint64(prefix[1:], ns)
Expand All @@ -1525,6 +1527,7 @@ func DeleteData(ns uint64) error {
// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string, ts uint64) error {
glog.Infof("Dropping predicate: [%s]", attr)
ResetCache()
preds := schema.State().PredicatesToDelete(attr)
for _, pred := range preds {
prefix := x.PredicatePrefix(pred)
Expand All @@ -1541,6 +1544,8 @@ func DeletePredicate(ctx context.Context, attr string, ts uint64) error {

// DeleteNamespace bans the namespace and deletes its predicates/types from the schema.
func DeleteNamespace(ns uint64) error {
// TODO: We should only delete cache for certain keys, not all the keys.
ResetCache()
schema.State().DeletePredsForNs(ns)
return pstore.BanNamespace(ns)
}
90 changes: 58 additions & 32 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,27 @@ func (l *List) getMutation(startTs uint64) []byte {
return nil
}

func (l *List) setMutationAfterCommit(startTs, commitTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))
pl.CommitTs = commitTs
for _, p := range pl.Postings {
p.CommitTs = commitTs
}

x.AssertTrue(pl.Pack == nil)

l.Lock()
if l.mutationMap == nil {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[startTs] = pl
if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
l.Unlock()
}

func (l *List) setMutation(startTs uint64, data []byte) {
pl := new(pb.PostingList)
x.Check(pl.Unmarshal(data))
Expand All @@ -566,6 +587,9 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.mutationMap = make(map[uint64]*pb.PostingList)
}
l.mutationMap[startTs] = pl
if pl.CommitTs != 0 {
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
}
l.Unlock()
}

Expand Down Expand Up @@ -783,6 +807,38 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
return count == 0, nil
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found := len(uids) > 0 && uids[0] == uid

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found = false
length = 0
continue
}
if mpost.Uid == uid {
found = (mpost.Op == Set)
}
if mpost.Op == Set {
length += 1
} else {
length -= 1
}

}
}
}

return length, found, nil
}

func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()
var count int
Expand Down Expand Up @@ -816,38 +872,6 @@ func (l *List) length(readTs, afterUid uint64) int {
return count
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found1 := len(uids) > 0 && uids[0] == uid

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found1 = false
length = 0
continue
}
if mpost.Uid == uid {
found1 = (mpost.Op == Set)
}
if mpost.Op == Set {
length += 1
} else {
length -= 1
}

}
}
}

return length, found1, nil
}

// Length iterates over the mutation layer and counts number of elements.
func (l *List) Length(readTs, afterUid uint64) int {
l.RLock()
Expand Down Expand Up @@ -1183,6 +1207,8 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
}

if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 {
// In case there were splits, this would read all the splits from
// Badger.
if err := l.encode(out, readTs, split); err != nil {
return nil, errors.Wrapf(err, "while encoding")
}
Expand Down
Loading

0 comments on commit d5d9373

Please sign in to comment.