From 766cc2a5460db7a367e9720ade757736391fb6aa Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 28 Dec 2018 15:02:01 -0800 Subject: [PATCH] Refactor rebuild index logic. * Delete the index.go file in the working module. The callers of the deleted methods will call the functions in the posting module directly. * The rebuild index functions delete the index first. * The rebuild index functions figure out themselves if the index needs to be rebuilt after the deletion, instead of delegating that task to the callers. * Simplify delete logic. No fuctional changes. --- posting/index.go | 85 +++++++++++++++++++++++++--------------------- worker/index.go | 70 -------------------------------------- worker/mutation.go | 38 ++++++++------------- 3 files changed, 61 insertions(+), 132 deletions(-) delete mode 100644 worker/index.go diff --git a/posting/index.go b/posting/index.go index 48f1ed3b24a..317ef4e224f 100644 --- a/posting/index.go +++ b/posting/index.go @@ -401,6 +401,12 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error { }) } +func deleteAllEntries(prefix []byte) error { + return deleteEntries(prefix, func(key []byte) bool { + return true + }) +} + func compareAttrAndType(key []byte, attr string, typ byte) bool { pk := x.Parse(key) if pk == nil { @@ -412,38 +418,29 @@ func compareAttrAndType(key []byte, attr string, typ byte) bool { return false } +func DeleteIndex(attr string) error { + pk := x.ParsedKey{Attr: attr} + prefix := pk.IndexPrefix() + return deleteAllEntries(prefix) +} + func DeleteReverseEdges(attr string) error { - lcache.clear(func(key []byte) bool { - return compareAttrAndType(key, attr, x.ByteReverse) - }) - // Delete index entries from data store. pk := x.ParsedKey{Attr: attr} prefix := pk.ReversePrefix() - return deleteEntries(prefix, func(key []byte) bool { - return true - }) + return deleteAllEntries(prefix) } func deleteCountIndex(attr string, reverse bool) error { pk := x.ParsedKey{Attr: attr} prefix := pk.CountPrefix(reverse) - return deleteEntries(prefix, func(key []byte) bool { - return true - }) + return deleteAllEntries(prefix) } func DeleteCountIndex(attr string) error { - lcache.clear(func(key []byte) bool { - return compareAttrAndType(key, attr, x.ByteCount) - }) - lcache.clear(func(key []byte) bool { - return compareAttrAndType(key, attr, x.ByteCountRev) - }) - // Delete index entries from data store. - if err := deleteCountIndex(attr, false); err != nil { + if err := deleteCountIndex(attr /*reverse*/, false); err != nil { return err } - if err := deleteCountIndex(attr, true); err != nil { // delete reverse count indexes. + if err := deleteCountIndex(attr /*reverse*/, true); err != nil { return err } return nil @@ -577,8 +574,17 @@ func (r *rebuild) Run(ctx context.Context) error { // RebuildIndex rebuilds index for a given attribute. // We commit mutations with startTs and ignore the errors. func RebuildIndex(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().IsIndexed(attr), "Attr %s not indexed", attr) + glog.Infof("Deleting index for %s", attr) + if err := DeleteIndex(attr); err != nil { + return err + } + // Exit early if attribute is not indexed in the new schema. + if !schema.State().IsIndexed(attr) { + return nil + } + + glog.Infof("Rebuilding index for %s", attr) pk := x.ParsedKey{Attr: attr} builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs} builder.fn = func(uid uint64, pl *List, txn *Txn) error { @@ -605,8 +611,17 @@ func RebuildIndex(ctx context.Context, attr string, startTs uint64) error { } func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().HasCount(attr), "Attr %s doesn't have count index", attr) + glog.Infof("Deleting count index for %s", attr) + if err := DeleteCountIndex(attr); err != nil { + return err + } + + // Exit early if attribute is not indexed in the new schema. + if !schema.State().HasCount(attr) { + return nil + } + glog.Infof("Rebuilding count index for %s", attr) var reverse bool fn := func(uid uint64, pl *List, txn *Txn) error { t := &pb.DirectedEdge{ @@ -644,15 +659,19 @@ func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error { return builder.Run(ctx) } -type item struct { - uid uint64 - list *List -} - // RebuildReverseEdges rebuilds the reverse edges for a given attribute. func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().IsReversed(attr), "Attr %s doesn't have reverse", attr) + glog.Infof("Deleting reverse index for %s", attr) + if err := DeleteReverseEdges(attr); err != nil { + return err + } + // Exit early if attribute is not indexed in the new schema. + if !schema.State().IsReversed(attr) { + return nil + } + + glog.Infof("Rebuilding reverse index for %s", attr) pk := x.ParsedKey{Attr: attr} builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs} builder.fn = func(uid uint64, pl *List, txn *Txn) error { @@ -679,18 +698,6 @@ func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error return builder.Run(ctx) } -func DeleteIndex(attr string) error { - lcache.clear(func(key []byte) bool { - return compareAttrAndType(key, attr, x.ByteIndex) - }) - // Delete index entries from data store. - pk := x.ParsedKey{Attr: attr} - prefix := pk.IndexPrefix() - return deleteEntries(prefix, func(key []byte) bool { - return true - }) -} - // This function is called when the schema is changed from scalar to list type. // We need to fingerprint the values to get the new ValueId. func RebuildListType(ctx context.Context, attr string, startTs uint64) error { diff --git a/worker/index.go b/worker/index.go deleted file mode 100644 index 03163f0e125..00000000000 --- a/worker/index.go +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package worker - -import ( - "golang.org/x/net/context" - - "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/schema" - "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" -) - -func (n *node) rebuildOrDelIndex(ctx context.Context, attr string, rebuild bool, startTs uint64) error { - if schema.State().IsIndexed(attr) != rebuild { - return x.Errorf("Predicate %s index mismatch, rebuild %v", attr, rebuild) - } - // Remove index edges - glog.Infof("Deleting index for %s", attr) - if err := posting.DeleteIndex(attr); err != nil { - return err - } - if rebuild { - glog.Infof("Rebuilding index for %s", attr) - return posting.RebuildIndex(ctx, attr, startTs) - } - return nil -} - -func (n *node) rebuildOrDelRevEdge(ctx context.Context, attr string, rebuild bool, startTs uint64) error { - if schema.State().IsReversed(attr) != rebuild { - return x.Errorf("Predicate %s reverse mismatch, rebuild %v", attr, rebuild) - } - glog.Infof("Deleting reverse index for %s", attr) - if err := posting.DeleteReverseEdges(attr); err != nil { - return err - } - if rebuild { - // Remove reverse edges - glog.Infof("Rebuilding reverse index for %s", attr) - return posting.RebuildReverseEdges(ctx, attr, startTs) - } - return nil -} - -func (n *node) rebuildOrDelCountIndex(ctx context.Context, attr string, rebuild bool, startTs uint64) error { - glog.Infof("Deleting count index for %s", attr) - if err := posting.DeleteCountIndex(attr); err != nil { - return err - } - if rebuild { - glog.Infof("Rebuilding count index for %s", attr) - return posting.RebuildCountIndex(ctx, attr, startTs) - } - return nil -} diff --git a/worker/mutation.go b/worker/mutation.go index e30ad491e12..f8b9752c50c 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -113,7 +113,6 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin } func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error { - n := groups().Node if !groups().ServesTablet(update.Predicate) { tablet := groups().Tablet(update.Predicate) return x.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet) @@ -139,26 +138,21 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start // We need watermark for index/reverse edge addition for linearizable reads. // (both applied and synced watermarks). defer glog.Infof("Done schema update %+v\n", update) + // If the schema was not present already, rebuild all indexes. if !ok { - if current.Directive == pb.SchemaUpdate_INDEX { - if err := n.rebuildOrDelIndex(ctx, update.Predicate, true, startTs); err != nil { - return err - } - } else if current.Directive == pb.SchemaUpdate_REVERSE { - if err := n.rebuildOrDelRevEdge(ctx, update.Predicate, true, startTs); err != nil { - return err - } + if err := posting.RebuildIndex(ctx, update.Predicate, startTs); err != nil { + return err } - - if current.Count { - if err := n.rebuildOrDelCountIndex(ctx, update.Predicate, true, startTs); err != nil { - return err - } + if err := posting.RebuildReverseEdges(ctx, update.Predicate, startTs); err != nil { + return err + } + if err := posting.RebuildCountIndex(ctx, update.Predicate, startTs); err != nil { + return err } return nil } - // schema was present already + // If the schema was was present already, rebuild indexes that need it. if current.List && !old.List { if err := posting.RebuildListType(ctx, update.Predicate, startTs); err != nil { return err @@ -170,24 +164,22 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start if needReindexing(old, current) { // Reindex if update.Index is true or remove index - if err := n.rebuildOrDelIndex(ctx, update.Predicate, - current.Directive == pb.SchemaUpdate_INDEX, startTs); err != nil { + if err := posting.RebuildIndex(ctx, update.Predicate, startTs); err != nil { return err } - } else if needsRebuildingReverses(old, current) { + } + if needsRebuildingReverses(old, current) { // Add or remove reverse edge based on update.Reverse - if err := n.rebuildOrDelRevEdge(ctx, update.Predicate, - current.Directive == pb.SchemaUpdate_REVERSE, startTs); err != nil { + if err := posting.RebuildReverseEdges(ctx, update.Predicate, startTs); err != nil { return err } } - if current.Count != old.Count { - if err := n.rebuildOrDelCountIndex(ctx, update.Predicate, current.Count, - startTs); err != nil { + if err := posting.RebuildCountIndex(ctx, update.Predicate, startTs); err != nil { return err } } + return nil }