Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rebuild index logic. #2851

Merged
merged 10 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 199 additions & 62 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"time"

"github.com/deckarep/golang-set"
"github.com/golang/glog"
otrace "go.opencensus.io/trace"

Expand Down Expand Up @@ -401,40 +402,35 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
})
}

func compareAttrAndType(key []byte, attr string, typ byte) bool {
pk := x.Parse(key)
if pk == nil {
return true
}
if pk.Attr == attr && pk.IsType(typ) {
func deleteAllEntries(prefix []byte) error {
return deleteEntries(prefix, func(key []byte) bool {
return true
}
return false
})
}

func DeleteReverseEdges(attr string) error {
// Delete index entries from data store.
func deleteIndex(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteAllEntries(prefix)
}

func deleteReverseEdges(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.ReversePrefix()
return deleteEntries(prefix, func(key []byte) bool {
return true
})
return deleteAllEntries(prefix)
}

func deleteCountIndex(attr string, reverse bool) error {
func deleteCountIndexInternal(attr string, reverse bool) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.CountPrefix(reverse)
return deleteEntries(prefix, func(key []byte) bool {
return true
})
return deleteAllEntries(prefix)
}

func DeleteCountIndex(attr string) error {
// Delete index entries from data store.
if err := deleteCountIndex(attr, false); err != nil {
func deleteCountIndex(attr string) error {
if err := deleteCountIndexInternal(attr, false /*reverse*/); err != nil {
return err
}
if err := deleteCountIndex(attr, true); err != nil { // delete reverse count indexes.
if err := deleteCountIndexInternal(attr, true /*reverse*/); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -537,15 +533,97 @@ func (r *rebuild) Run(ctx context.Context) error {
return writer.Flush()
}

// IndexRebuild holds the info needed to initiate a rebuilt of the indices.
type IndexRebuild struct {
Attr string
StartTs uint64
OldSchema *pb.SchemaUpdate
CurrentSchema *pb.SchemaUpdate
}

// Run rebuilds all indices that need it.
func (rb *IndexRebuild) Run(ctx context.Context) error {
if err := RebuildListType(ctx, rb); err != nil {
return err
}
if err := RebuildIndex(ctx, rb); err != nil {
return err
}
if err := RebuildCountIndex(ctx, rb); err != nil {
return err
}
if err := RebuildReverseEdges(ctx, rb); err != nil {
return err
}

return nil
}

// needsIndexRebuild returns true if the index needs to be rebuilt (or just
// deleted). It returns false if the index can be left as is.
func needsIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) bool {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

// If no previous schema existed, the index needs to be built.
if old == nil {
return true
}

currIndex := current.Directive == pb.SchemaUpdate_INDEX
prevIndex := old.Directive == pb.SchemaUpdate_INDEX

// Index needs to be rebuilt if the scheme directive changed.
if currIndex != prevIndex {
return true
}

// Index does not need to be rebuild if the schema is not currently indexed
// and was previously not indexed. Predicate is not checking prevIndex
// since the previous if statement guarantees both values are the same.
if !currIndex {
return false
}

// Index needs to be rebuilt if the value types have changed.
if currIndex && current.ValueType != old.ValueType {
return true
}

// Index needs to be rebuilt if the tokenizers have changed
prevTokens := mapset.NewSet()
for _, t := range old.Tokenizer {
prevTokens.Add(t)
}
currTokens := mapset.NewSet()
for _, t := range current.Tokenizer {
currTokens.Add(t)
}
return !prevTokens.Equal(currTokens)
}

// RebuildIndex rebuilds index for a given attribute.
// We commit mutations with startTs and ignore the errors.
func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsIndexed(attr), "Attr %s not indexed", attr)
func RebuildIndex(ctx context.Context, rb *IndexRebuild) error {
// Exit early if indices do not need to be rebuilt.
if !needsIndexRebuild(rb.OldSchema, rb.CurrentSchema) {
return nil
}

pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
glog.Infof("Deleting index for %s", rb.Attr)
if err := deleteIndex(rb.Attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().IsIndexed(rb.Attr) {
return nil
}

glog.Infof("Rebuilding index for %s", rb.Attr)
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
edge := pb.DirectedEdge{Attr: attr, Entity: uid}
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
// Add index entries based on p.
val := types.Val{
Expand All @@ -567,17 +645,45 @@ func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
return builder.Run(ctx)
}

func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().HasCount(attr), "Attr %s doesn't have count index", attr)
// needsCountIndexRebuild returns true if the count index needs to be rebuilt
// (or just deleted). It returns false if the index can be left as is.
func needsCountIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) bool {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

if old == nil {
return true
}
if current.Count != old.Count {
return true
}
return false
}

// RebuildCountIndex rebuilds the count index for a given attribute.
func RebuildCountIndex(ctx context.Context, rb *IndexRebuild) error {
if !needsCountIndexRebuild(rb.OldSchema, rb.CurrentSchema) {
return nil
}

glog.Infof("Deleting count index for %s", rb.Attr)
if err := deleteCountIndex(rb.Attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().HasCount(rb.Attr) {
return nil
}

glog.Infof("Rebuilding count index for %s", rb.Attr)
var reverse bool
fn := func(uid uint64, pl *List, txn *Txn) error {
t := &pb.DirectedEdge{
ValueId: uid,
Attr: attr,
Attr: rb.Attr,
Op: pb.DirectedEdge_SET,
}
sz := pl.Length(startTs, 0)
sz := pl.Length(rb.StartTs, 0)
if sz == -1 {
return nil
}
Expand All @@ -593,33 +699,53 @@ func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error {
}

// Create the forward index.
pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = fn
if err := builder.Run(ctx); err != nil {
return err
}

// Create the reverse index.
reverse = true
builder = rebuild{prefix: pk.ReversePrefix(), startTs: startTs}
builder = rebuild{prefix: pk.ReversePrefix(), startTs: rb.StartTs}
builder.fn = fn
return builder.Run(ctx)
}

type item struct {
uid uint64
list *List
// needsReverseEdgesRebuildl returns true if the reverse edges need to be rebuilt
// (or just deleted). It returns false if they can be left as is.
func needsReverseEdgesRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) bool {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

if old == nil {
return true
}
return (current.Directive == pb.SchemaUpdate_REVERSE) !=
(old.Directive == pb.SchemaUpdate_REVERSE)
}

// RebuildReverseEdges rebuilds the reverse edges for a given attribute.
func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsReversed(attr), "Attr %s doesn't have reverse", attr)
func RebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error {
if !needsReverseEdgesRebuild(rb.OldSchema, rb.CurrentSchema) {
return nil
}

pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
glog.Infof("Deleting reverse index for %s", rb.Attr)
if err := deleteReverseEdges(rb.Attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().IsReversed(rb.Attr) {
return nil
}

glog.Infof("Rebuilding reverse index for %s", rb.Attr)
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
edge := pb.DirectedEdge{Attr: attr, Entity: uid}
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(pp *pb.Posting) error {
puid := pp.Uid
// Add reverse entries based on p.
Expand All @@ -642,26 +768,35 @@ func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error
return builder.Run(ctx)
}

func DeleteIndex(attr string) error {
// Delete index entries from data store.
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteEntries(prefix, func(key []byte) bool {
return true
})
// needsListTypeRebuild returns true if the schema changed from a scalar to a
// list. It returns true if the index can be left as is.
func needsListTypeRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) (bool, error) {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

if old == nil {
return false, nil
}
if current.List && !old.List {
return true, nil
}
if old.List && !current.List {
return false, fmt.Errorf("Type can't be changed from list to scalar for attr: [%s]"+
" without dropping it first.", current.Predicate)
}

return false, nil
}

// This function is called when the schema is changed from scalar to list type.
// RebuildListType rebuilds the index when the schema is changed from scalar to list type.
// We need to fingerprint the values to get the new ValueId.
func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsList(attr), "Attr %s is not of list type", attr)

// Let's clear out the cache for anything which belongs to this attribute,
// so once we're done, any reads would see the new list type. Note that we
// don't use lcache during the rebuild process.
func RebuildListType(ctx context.Context, rb *IndexRebuild) error {
if needsRebuild, err := needsListTypeRebuild(rb.OldSchema, rb.CurrentSchema); !needsRebuild ||
err != nil {
return err
}

pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
var mpost *pb.Posting
err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
Expand All @@ -681,7 +816,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
// Delete the old edge corresponding to ValueId math.MaxUint64
t := &pb.DirectedEdge{
ValueId: mpost.Uid,
Attr: attr,
Attr: rb.Attr,
Op: pb.DirectedEdge_DEL,
}

Expand All @@ -693,7 +828,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
}
// Add the new edge with the fingerprinted value id.
newEdge := &pb.DirectedEdge{
Attr: attr,
Attr: rb.Attr,
Value: mpost.Value,
ValueType: mpost.ValType,
Op: pb.DirectedEdge_SET,
Expand All @@ -705,6 +840,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
return builder.Run(ctx)
}

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
return deleteEntries(nil, func(key []byte) bool {
pk := x.Parse(key)
Expand All @@ -719,6 +855,7 @@ func DeleteAll() error {
})
}

// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string) error {
glog.Infof("Dropping predicate: [%s]", attr)
pk := x.ParsedKey{
Expand All @@ -737,18 +874,18 @@ func DeletePredicate(ctx context.Context, attr string) error {
indexed := schema.State().IsIndexed(attr)
reversed := schema.State().IsReversed(attr)
if indexed {
if err := DeleteIndex(attr); err != nil {
if err := deleteIndex(attr); err != nil {
return err
}
} else if reversed {
if err := DeleteReverseEdges(attr); err != nil {
if err := deleteReverseEdges(attr); err != nil {
return err
}
}

hasCountIndex := schema.State().HasCount(attr)
if hasCountIndex {
if err := DeleteCountIndex(attr); err != nil {
if err := deleteCountIndex(attr); err != nil {
return err
}
}
Expand Down
Loading