Skip to content

Commit

Permalink
Refactor rebuild index logic.
Browse files Browse the repository at this point in the history
* Delete the index.go file in the working module. The callers of the
  deleted methods will call the functions in the posting module
  directly.
* The rebuild index functions delete the index first.
* The rebuild index functions figure out themselves if the index needs
  to be rebuilt after the deletion, instead of delegating that task to
  the callers.
* Simplify delete logic.

No fuctional changes.
  • Loading branch information
martinmr committed Dec 29, 2018
1 parent 320fb1c commit 766cc2a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 132 deletions.
85 changes: 46 additions & 39 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
})
}

func deleteAllEntries(prefix []byte) error {
return deleteEntries(prefix, func(key []byte) bool {
return true
})
}

func compareAttrAndType(key []byte, attr string, typ byte) bool {
pk := x.Parse(key)
if pk == nil {
Expand All @@ -412,38 +418,29 @@ func compareAttrAndType(key []byte, attr string, typ byte) bool {
return false
}

func DeleteIndex(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteAllEntries(prefix)
}

func DeleteReverseEdges(attr string) error {
lcache.clear(func(key []byte) bool {
return compareAttrAndType(key, attr, x.ByteReverse)
})
// Delete index entries from data store.
pk := x.ParsedKey{Attr: attr}
prefix := pk.ReversePrefix()
return deleteEntries(prefix, func(key []byte) bool {
return true
})
return deleteAllEntries(prefix)
}

func deleteCountIndex(attr string, reverse bool) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.CountPrefix(reverse)
return deleteEntries(prefix, func(key []byte) bool {
return true
})
return deleteAllEntries(prefix)
}

func DeleteCountIndex(attr string) error {
lcache.clear(func(key []byte) bool {
return compareAttrAndType(key, attr, x.ByteCount)
})
lcache.clear(func(key []byte) bool {
return compareAttrAndType(key, attr, x.ByteCountRev)
})
// Delete index entries from data store.
if err := deleteCountIndex(attr, false); err != nil {
if err := deleteCountIndex(attr /*reverse*/, false); err != nil {
return err
}
if err := deleteCountIndex(attr, true); err != nil { // delete reverse count indexes.
if err := deleteCountIndex(attr /*reverse*/, true); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -577,8 +574,17 @@ func (r *rebuild) Run(ctx context.Context) error {
// RebuildIndex rebuilds index for a given attribute.
// We commit mutations with startTs and ignore the errors.
func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsIndexed(attr), "Attr %s not indexed", attr)
glog.Infof("Deleting index for %s", attr)
if err := DeleteIndex(attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().IsIndexed(attr) {
return nil
}

glog.Infof("Rebuilding index for %s", attr)
pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
Expand All @@ -605,8 +611,17 @@ func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
}

func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().HasCount(attr), "Attr %s doesn't have count index", attr)
glog.Infof("Deleting count index for %s", attr)
if err := DeleteCountIndex(attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().HasCount(attr) {
return nil
}

glog.Infof("Rebuilding count index for %s", attr)
var reverse bool
fn := func(uid uint64, pl *List, txn *Txn) error {
t := &pb.DirectedEdge{
Expand Down Expand Up @@ -644,15 +659,19 @@ func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error {
return builder.Run(ctx)
}

type item struct {
uid uint64
list *List
}

// RebuildReverseEdges rebuilds the reverse edges for a given attribute.
func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsReversed(attr), "Attr %s doesn't have reverse", attr)
glog.Infof("Deleting reverse index for %s", attr)
if err := DeleteReverseEdges(attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().IsReversed(attr) {
return nil
}

glog.Infof("Rebuilding reverse index for %s", attr)
pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
Expand All @@ -679,18 +698,6 @@ func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error
return builder.Run(ctx)
}

func DeleteIndex(attr string) error {
lcache.clear(func(key []byte) bool {
return compareAttrAndType(key, attr, x.ByteIndex)
})
// Delete index entries from data store.
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteEntries(prefix, func(key []byte) bool {
return true
})
}

// This function is called when the schema is changed from scalar to list type.
// We need to fingerprint the values to get the new ValueId.
func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
Expand Down
70 changes: 0 additions & 70 deletions worker/index.go

This file was deleted.

38 changes: 15 additions & 23 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin
}

func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error {
n := groups().Node
if !groups().ServesTablet(update.Predicate) {
tablet := groups().Tablet(update.Predicate)
return x.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet)
Expand All @@ -139,26 +138,21 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start
// We need watermark for index/reverse edge addition for linearizable reads.
// (both applied and synced watermarks).
defer glog.Infof("Done schema update %+v\n", update)
// If the schema was not present already, rebuild all indexes.
if !ok {
if current.Directive == pb.SchemaUpdate_INDEX {
if err := n.rebuildOrDelIndex(ctx, update.Predicate, true, startTs); err != nil {
return err
}
} else if current.Directive == pb.SchemaUpdate_REVERSE {
if err := n.rebuildOrDelRevEdge(ctx, update.Predicate, true, startTs); err != nil {
return err
}
if err := posting.RebuildIndex(ctx, update.Predicate, startTs); err != nil {
return err
}

if current.Count {
if err := n.rebuildOrDelCountIndex(ctx, update.Predicate, true, startTs); err != nil {
return err
}
if err := posting.RebuildReverseEdges(ctx, update.Predicate, startTs); err != nil {
return err
}
if err := posting.RebuildCountIndex(ctx, update.Predicate, startTs); err != nil {
return err
}
return nil
}

// schema was present already
// If the schema was was present already, rebuild indexes that need it.
if current.List && !old.List {
if err := posting.RebuildListType(ctx, update.Predicate, startTs); err != nil {
return err
Expand All @@ -170,24 +164,22 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start

if needReindexing(old, current) {
// Reindex if update.Index is true or remove index
if err := n.rebuildOrDelIndex(ctx, update.Predicate,
current.Directive == pb.SchemaUpdate_INDEX, startTs); err != nil {
if err := posting.RebuildIndex(ctx, update.Predicate, startTs); err != nil {
return err
}
} else if needsRebuildingReverses(old, current) {
}
if needsRebuildingReverses(old, current) {
// Add or remove reverse edge based on update.Reverse
if err := n.rebuildOrDelRevEdge(ctx, update.Predicate,
current.Directive == pb.SchemaUpdate_REVERSE, startTs); err != nil {
if err := posting.RebuildReverseEdges(ctx, update.Predicate, startTs); err != nil {
return err
}
}

if current.Count != old.Count {
if err := n.rebuildOrDelCountIndex(ctx, update.Predicate, current.Count,
startTs); err != nil {
if err := posting.RebuildCountIndex(ctx, update.Predicate, startTs); err != nil {
return err
}
}

return nil
}

Expand Down

0 comments on commit 766cc2a

Please sign in to comment.