From 70cbb06f9c69171e0cfbb660a2dc73fc3f726f26 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 15 Jan 2019 14:27:18 -0800 Subject: [PATCH] Refactor reindexing code to only reindex specific tokenizers. (#2883) Currently during reindexing, the whole index is being deleted and rebuilt, even if there are tokenizers that did not change between schemas. This change allows reindexing to happen only for tokenizers that need it. --- posting/index.go | 277 +++++++++++++++++++++++++++++------------- posting/index_test.go | 247 ++++++++++++++++++++++++++----------- posting/mvcc.go | 3 +- tok/tok.go | 14 +++ x/x.go | 20 +-- 5 files changed, 394 insertions(+), 167 deletions(-) diff --git a/posting/index.go b/posting/index.go index 40f5712579c..d4f2f7c1149 100644 --- a/posting/index.go +++ b/posting/index.go @@ -22,7 +22,6 @@ import ( "encoding/hex" "fmt" "math" - "reflect" "time" "github.com/golang/glog" @@ -38,8 +37,19 @@ import ( var emptyCountParams countParams -// IndexTokens return tokens, without the predicate prefix and index rune. -func indexTokens(attr, lang string, src types.Val) ([]string, error) { +type indexMutationInfo struct { + tokenizers []tok.Tokenizer + edge *pb.DirectedEdge // Represents the original uid -> value edge. + val types.Val + op pb.DirectedEdge_Op +} + +// indexTokensforTokenizers return tokens, without the predicate prefix and +// index rune, for specific tokenizers. +func indexTokens(info *indexMutationInfo) ([]string, error) { + attr := info.edge.Attr + lang := info.edge.GetLang() + schemaType, err := schema.State().TypeOf(attr) if err != nil || !schemaType.IsScalar() { return nil, x.Errorf("Cannot index attribute %s of type object.", attr) @@ -48,13 +58,13 @@ func indexTokens(attr, lang string, src types.Val) ([]string, error) { if !schema.State().IsIndexed(attr) { return nil, x.Errorf("Attribute %s is not indexed.", attr) } - sv, err := types.Convert(src, schemaType) + sv, err := types.Convert(info.val, schemaType) if err != nil { return nil, err } - // Schema will know the mapping from attr to tokenizer. + var tokens []string - for _, it := range schema.State().Tokenizer(attr) { + for _, it := range info.tokenizers { if it.Name() == "exact" && schemaType == types.StringID && len(sv.Value.(string)) > 100 { // Exact index can only be applied for strings so we can safely try to convert Value to // string. @@ -70,15 +80,18 @@ func indexTokens(attr, lang string, src types.Val) ([]string, error) { return tokens, nil } -// addIndexMutations adds mutation(s) for a single term, to maintain index. -// t represents the original uid -> value edge. +// addIndexMutations adds mutation(s) for a single term, to maintain the index, +// but only for the given tokenizers. // TODO - See if we need to pass op as argument as t should already have Op. -func (txn *Txn) addIndexMutations(ctx context.Context, t *pb.DirectedEdge, p types.Val, - op pb.DirectedEdge_Op) error { - attr := t.Attr - uid := t.Entity +func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) error { + if info.tokenizers == nil { + info.tokenizers = schema.State().Tokenizer(info.edge.Attr) + } + + attr := info.edge.Attr + uid := info.edge.Entity x.AssertTrue(uid != 0) - tokens, err := indexTokens(attr, t.GetLang(), p) + tokens, err := indexTokens(info) if err != nil { // This data is not indexable @@ -89,7 +102,7 @@ func (txn *Txn) addIndexMutations(ctx context.Context, t *pb.DirectedEdge, p typ edge := &pb.DirectedEdge{ ValueId: uid, Attr: attr, - Op: op, + Op: info.op, } for _, token := range tokens { @@ -190,15 +203,15 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro return nil } -func (l *List) handleDeleteAll(ctx context.Context, t *pb.DirectedEdge, +func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { - isReversed := schema.State().IsReversed(t.Attr) - isIndexed := schema.State().IsIndexed(t.Attr) - hasCount := schema.State().HasCount(t.Attr) + isReversed := schema.State().IsReversed(edge.Attr) + isIndexed := schema.State().IsIndexed(edge.Attr) + hasCount := schema.State().HasCount(edge.Attr) delEdge := &pb.DirectedEdge{ - Attr: t.Attr, - Op: t.Op, - Entity: t.Entity, + Attr: edge.Attr, + Op: edge.Op, + Entity: edge.Entity, } // To calculate length of posting list. Used for deletion of count index. var plen int @@ -211,11 +224,16 @@ func (l *List) handleDeleteAll(ctx context.Context, t *pb.DirectedEdge, return txn.addReverseMutation(ctx, delEdge) case isIndexed: // Delete index edge of each posting. - p := types.Val{ + val := types.Val{ Tid: types.TypeID(p.ValType), Value: p.Value, } - return txn.addIndexMutations(ctx, t, p, pb.DirectedEdge_DEL) + return txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(edge.Attr), + edge: edge, + val: val, + op: pb.DirectedEdge_DEL, + }) default: return nil } @@ -227,10 +245,10 @@ func (l *List) handleDeleteAll(ctx context.Context, t *pb.DirectedEdge, // Delete uid from count index. Deletion of reverses is taken care by addReverseMutation // above. if err := txn.updateCount(ctx, countParams{ - attr: t.Attr, + attr: edge.Attr, countBefore: plen, countAfter: 0, - entity: t.Entity, + entity: edge.Entity, }); err != nil { return err } @@ -238,7 +256,7 @@ func (l *List) handleDeleteAll(ctx context.Context, t *pb.DirectedEdge, l.Lock() defer l.Unlock() - return l.addMutation(ctx, txn, t) + return l.addMutation(ctx, txn, edge) } func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count uint32, @@ -329,24 +347,24 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo // AddMutationWithIndex is AddMutation with support for indexing. It also // supports reverse edges. -func (l *List) AddMutationWithIndex(ctx context.Context, t *pb.DirectedEdge, +func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { - if len(t.Attr) == 0 { + if len(edge.Attr) == 0 { return x.Errorf("Predicate cannot be empty for edge with subject: [%v], object: [%v]"+ - " and value: [%v]", t.Entity, t.ValueId, t.Value) + " and value: [%v]", edge.Entity, edge.ValueId, edge.Value) } - if t.Op == pb.DirectedEdge_DEL && string(t.Value) == x.Star { - return l.handleDeleteAll(ctx, t, txn) + if edge.Op == pb.DirectedEdge_DEL && string(edge.Value) == x.Star { + return l.handleDeleteAll(ctx, edge, txn) } - doUpdateIndex := pstore != nil && schema.State().IsIndexed(t.Attr) - hasCountIndex := schema.State().HasCount(t.Attr) - val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, t) + doUpdateIndex := pstore != nil && schema.State().IsIndexed(edge.Attr) + hasCountIndex := schema.State().HasCount(edge.Attr) + val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge) if err != nil { return err } - x.PredicateStats.Add(t.Attr, 1) + x.PredicateStats.Add(edge.Attr, 1) if hasCountIndex && cp.countAfter != cp.countBefore { if err := txn.updateCount(ctx, cp); err != nil { return err @@ -355,24 +373,34 @@ func (l *List) AddMutationWithIndex(ctx context.Context, t *pb.DirectedEdge, if doUpdateIndex { // Exact matches. if found && val.Value != nil { - if err := txn.addIndexMutations(ctx, t, val, pb.DirectedEdge_DEL); err != nil { + if err := txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(edge.Attr), + edge: edge, + val: val, + op: pb.DirectedEdge_DEL, + }); err != nil { return err } } - if t.Op == pb.DirectedEdge_SET { - p := types.Val{ - Tid: types.TypeID(t.ValueType), - Value: t.Value, + if edge.Op == pb.DirectedEdge_SET { + val = types.Val{ + Tid: types.TypeID(edge.ValueType), + Value: edge.Value, } - if err := txn.addIndexMutations(ctx, t, p, pb.DirectedEdge_SET); err != nil { + if err := txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(edge.Attr), + edge: edge, + val: val, + op: pb.DirectedEdge_SET, + }); err != nil { return err } } } // Add reverse mutation irrespective of hasMutated, server crash can happen after // mutation is synced and before reverse edge is synced - if (pstore != nil) && (t.ValueId != 0) && schema.State().IsReversed(t.Attr) { - if err := txn.addReverseMutation(ctx, t); err != nil { + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) { + if err := txn.addReverseMutation(ctx, edge); err != nil { return err } } @@ -408,12 +436,27 @@ func deleteAllEntries(prefix []byte) error { }) } -func deleteIndex(attr string) error { +// deleteAllTokens deletes the index for the given attribute. All tokenizers are +// used by this function. +func deleteAllTokens(attr string) error { pk := x.ParsedKey{Attr: attr} prefix := pk.IndexPrefix() return deleteAllEntries(prefix) } +// deleteTokensFor deletes the index for the given attribute and token. +func deleteTokensFor(attr, tokenizerName string) error { + pk := x.ParsedKey{Attr: attr} + prefix := pk.IndexPrefix() + tokenizer, ok := tok.GetTokenizer(tokenizerName) + if !ok { + return fmt.Errorf("Could not find valid tokenizer for %s", tokenizerName) + } + prefix = append(prefix, tokenizer.Identifier()) + + return deleteAllEntries(prefix) +} + func deleteReverseEdges(attr string) error { pk := x.ParsedKey{Attr: attr} prefix := pk.ReversePrefix() @@ -556,20 +599,31 @@ func (rb *IndexRebuild) Run(ctx context.Context) error { return RebuildReverseEdges(ctx, rb) } -func needsIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { - x.AssertTruef(current != nil, "Current schema cannot be nil.") +type indexRebuildInfo struct { + op indexOp + tokenizersToDelete []string + tokenizersToRebuild []string +} + +func (rb *IndexRebuild) needsIndexRebuild() indexRebuildInfo { + x.AssertTruef(rb.CurrentSchema != nil, "Current schema cannot be nil.") + // If the old schema is nil, we can treat it as an empty schema. Copy it + // first to avoid overwriting it in rb. + old := rb.OldSchema if old == nil { old = &pb.SchemaUpdate{} } - currIndex := current.Directive == pb.SchemaUpdate_INDEX + currIndex := rb.CurrentSchema.Directive == pb.SchemaUpdate_INDEX prevIndex := old.Directive == pb.SchemaUpdate_INDEX // Index does not need to be rebuilt or deleted if the scheme directive // did not require an index before and now. if !currIndex && !prevIndex { - return indexNoop + return indexRebuildInfo{ + op: indexNoop, + } } // Index only needs to be deleted if the schema directive changed and the @@ -577,51 +631,90 @@ func needsIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { // prevIndex since the previous if statement guarantees both values are // different. if !currIndex { - return indexDelete + return indexRebuildInfo{ + op: indexDelete, + tokenizersToDelete: old.Tokenizer, + } } - // Index needs to be rebuilt if the value types have changed. - if currIndex && current.ValueType != old.ValueType { - return indexRebuild + // All tokenizers in the index need to be deleted and rebuilt if the value + // types have changed. + if currIndex && rb.CurrentSchema.ValueType != old.ValueType { + return indexRebuildInfo{ + op: indexRebuild, + tokenizersToDelete: old.Tokenizer, + tokenizersToRebuild: rb.CurrentSchema.Tokenizer, + } } // Index needs to be rebuilt if the tokenizers have changed - prevTokens := make(map[string]bool) + prevTokens := make(map[string]struct{}) for _, t := range old.Tokenizer { - prevTokens[t] = true + prevTokens[t] = struct{}{} } - currTokens := make(map[string]bool) - for _, t := range current.Tokenizer { - currTokens[t] = true + currTokens := make(map[string]struct{}) + for _, t := range rb.CurrentSchema.Tokenizer { + currTokens[t] = struct{}{} } - if equal := reflect.DeepEqual(prevTokens, currTokens); equal { - return indexNoop + newTokenizers, deletedTokenizers := x.Diff(currTokens, prevTokens) + + // If the tokenizers are the same, nothing needs to be done. + if len(newTokenizers) == 0 && len(deletedTokenizers) == 0 { + return indexRebuildInfo{ + op: indexNoop, + } + } + + return indexRebuildInfo{ + op: indexRebuild, + tokenizersToDelete: deletedTokenizers, + tokenizersToRebuild: newTokenizers, } - return indexRebuild } // RebuildIndex rebuilds index for a given attribute. // We commit mutations with startTs and ignore the errors. func RebuildIndex(ctx context.Context, rb *IndexRebuild) error { // Exit early if indices do not need to be rebuilt. - op := needsIndexRebuild(rb.OldSchema, rb.CurrentSchema) + rebuildInfo := rb.needsIndexRebuild() - if op == indexNoop { + if rebuildInfo.op == indexNoop { return nil } - glog.Infof("Deleting index for %s", rb.Attr) - if err := deleteIndex(rb.Attr); err != nil { - return err + glog.Infof("Deleting index for attr %s and tokenizers %s", rb.Attr, + rebuildInfo.tokenizersToDelete) + for _, tokenizer := range rebuildInfo.tokenizersToDelete { + if err := deleteTokensFor(rb.Attr, tokenizer); err != nil { + return err + } } - // Exit early if the index only neeed to be deleted and not rebuild. - if op == indexDelete { + // Exit early if the index only need to be deleted and not rebuild. + if rebuildInfo.op == indexDelete { return nil } - glog.Infof("Rebuilding index for %s", rb.Attr) + // Exit early if there are no tokenizers to rebuild. + if len(rebuildInfo.tokenizersToRebuild) == 0 { + return nil + } + + glog.Infof("Rebuilding index for attr %s and tokenizers %s", rb.Attr, + rebuildInfo.tokenizersToRebuild) + // Before rebuilding, the existing index needs to be deleted. + for _, tokenizer := range rebuildInfo.tokenizersToRebuild { + if err := deleteTokensFor(rb.Attr, tokenizer); err != nil { + return err + } + } + + tokenizers, err := tok.GetTokenizers(rebuildInfo.tokenizersToRebuild) + if err != nil { + return err + } + pk := x.ParsedKey{Attr: rb.Attr} builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs} builder.fn = func(uid uint64, pl *List, txn *Txn) error { @@ -634,7 +727,12 @@ func RebuildIndex(ctx context.Context, rb *IndexRebuild) error { } for { - err := txn.addIndexMutations(ctx, &edge, val, pb.DirectedEdge_SET) + err := txn.addIndexMutations(ctx, &indexMutationInfo{ + tokenizers: tokenizers, + edge: &edge, + val: val, + op: pb.DirectedEdge_SET, + }) switch err { case ErrRetry: time.Sleep(10 * time.Millisecond) @@ -647,21 +745,24 @@ func RebuildIndex(ctx context.Context, rb *IndexRebuild) error { return builder.Run(ctx) } -func needsCountIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { - x.AssertTruef(current != nil, "Current schema cannot be nil.") +func (rb *IndexRebuild) needsCountIndexRebuild() indexOp { + x.AssertTruef(rb.CurrentSchema != nil, "Current schema cannot be nil.") + // If the old schema is nil, treat it as an empty schema. Copy it to avoid + // overwriting it in rb. + old := rb.OldSchema if old == nil { old = &pb.SchemaUpdate{} } // Do nothing if the schema directive did not change. - if current.Count == old.Count { + if rb.CurrentSchema.Count == old.Count { return indexNoop } // If the new schema does not require an index, delete the current index. - if !current.Count { + if !rb.CurrentSchema.Count { return indexDelete } @@ -671,7 +772,7 @@ func needsCountIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) inde // RebuildCountIndex rebuilds the count index for a given attribute. func RebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { - op := needsCountIndexRebuild(rb.OldSchema, rb.CurrentSchema) + op := rb.needsCountIndexRebuild() if op == indexNoop { return nil } @@ -724,14 +825,17 @@ func RebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { return builder.Run(ctx) } -func needsReverseEdgesRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { - x.AssertTruef(current != nil, "Current schema cannot be nil.") +func (rb *IndexRebuild) needsReverseEdgesRebuild() indexOp { + x.AssertTruef(rb.CurrentSchema != nil, "Current schema cannot be nil.") + // If old schema is nil, treat it as an empty schema. Copy it to avoid + // overwriting it in rb. + old := rb.OldSchema if old == nil { old = &pb.SchemaUpdate{} } - currIndex := current.Directive == pb.SchemaUpdate_REVERSE + currIndex := rb.CurrentSchema.Directive == pb.SchemaUpdate_REVERSE prevIndex := old.Directive == pb.SchemaUpdate_REVERSE // If the schema directive did not change, return indexNoop. @@ -749,7 +853,7 @@ func needsReverseEdgesRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) in // RebuildReverseEdges rebuilds the reverse edges for a given attribute. func RebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { - op := needsReverseEdgesRebuild(rb.OldSchema, rb.CurrentSchema) + op := rb.needsReverseEdgesRebuild() if op == indexNoop { return nil } @@ -793,18 +897,18 @@ func RebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { // needsListTypeRebuild returns true if the schema changed from a scalar to a // list. It returns true if the index can be left as is. -func needsListTypeRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) (bool, error) { - x.AssertTruef(current != nil, "Current schema cannot be nil.") +func (rb *IndexRebuild) needsListTypeRebuild() (bool, error) { + x.AssertTruef(rb.CurrentSchema != nil, "Current schema cannot be nil.") - if old == nil { + if rb.OldSchema == nil { return false, nil } - if current.List && !old.List { + if rb.CurrentSchema.List && !rb.OldSchema.List { return true, nil } - if old.List && !current.List { + if rb.OldSchema.List && !rb.CurrentSchema.List { return false, fmt.Errorf("Type can't be changed from list to scalar for attr: [%s]"+ - " without dropping it first.", current.Predicate) + " without dropping it first.", rb.CurrentSchema.Predicate) } return false, nil @@ -813,8 +917,7 @@ func needsListTypeRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) (bool, // RebuildListType rebuilds the index when the schema is changed from scalar to list type. // We need to fingerprint the values to get the new ValueId. func RebuildListType(ctx context.Context, rb *IndexRebuild) error { - if needsRebuild, err := needsListTypeRebuild(rb.OldSchema, rb.CurrentSchema); !needsRebuild || - err != nil { + if needsRebuild, err := rb.needsListTypeRebuild(); !needsRebuild || err != nil { return err } @@ -897,7 +1000,7 @@ func DeletePredicate(ctx context.Context, attr string) error { indexed := schema.State().IsIndexed(attr) reversed := schema.State().IsReversed(attr) if indexed { - if err := deleteIndex(attr); err != nil { + if err := deleteAllTokens(attr); err != nil { return err } } else if reversed { diff --git a/posting/index_test.go b/posting/index_test.go index ea226208c4c..d78baec0c9b 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -38,16 +38,28 @@ func uids(l *List, readTs uint64) []uint64 { return r.Uids } +// indexTokensForTest is just a wrapper around indexTokens used for convenience. +func indexTokensForTest(attr, lang string, val types.Val) ([]string, error) { + return indexTokens(&indexMutationInfo{ + tokenizers: schema.State().Tokenizer(attr), + edge: &pb.DirectedEdge{ + Attr: attr, + Lang: lang, + }, + val: val, + }) +} + func TestIndexingInt(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("age:int @index(int) ."), 1)) - a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("10")}) + a, err := indexTokensForTest("age", "", types.Val{Tid: types.StringID, Value: []byte("10")}) require.NoError(t, err) require.EqualValues(t, []byte{0x6, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa}, []byte(a[0])) } func TestIndexingIntNegative(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("age:int @index(int) ."), 1)) - a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("-10")}) + a, err := indexTokensForTest("age", "", types.Val{Tid: types.StringID, Value: []byte("-10")}) require.NoError(t, err) require.EqualValues(t, []byte{0x6, 0x0, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xf6}, []byte(a[0])) @@ -55,14 +67,14 @@ func TestIndexingIntNegative(t *testing.T) { func TestIndexingFloat(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("age:float @index(float) ."), 1)) - a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("10.43")}) + a, err := indexTokensForTest("age", "", types.Val{Tid: types.StringID, Value: []byte("10.43")}) require.NoError(t, err) require.EqualValues(t, []byte{0x7, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa}, []byte(a[0])) } func TestIndexingTime(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("age:dateTime @index(year) ."), 1)) - a, err := indexTokens("age", "", types.Val{Tid: types.StringID, + a, err := indexTokensForTest("age", "", types.Val{Tid: types.StringID, Value: []byte("0010-01-01T01:01:01.000000001")}) require.NoError(t, err) require.EqualValues(t, []byte{0x4, 0x0, 0xa}, []byte(a[0])) @@ -70,7 +82,7 @@ func TestIndexingTime(t *testing.T) { func TestIndexing(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("name:string @index(term) ."), 1)) - a, err := indexTokens("name", "", types.Val{Tid: types.StringID, Value: []byte("abc")}) + a, err := indexTokensForTest("name", "", types.Val{Tid: types.StringID, Value: []byte("abc")}) require.NoError(t, err) require.EqualValues(t, "\x01abc", string(a[0])) } @@ -79,24 +91,25 @@ func TestIndexingMultiLang(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("name:string @index(fulltext) ."), 1)) // ensure that default tokenizer is suitable for English - a, err := indexTokens("name", "", types.Val{Tid: types.StringID, Value: []byte("stemming")}) + a, err := indexTokensForTest("name", "", types.Val{Tid: types.StringID, + Value: []byte("stemming")}) require.NoError(t, err) require.EqualValues(t, "\x08stem", string(a[0])) // ensure that Finnish tokenizer is used - a, err = indexTokens("name", "fi", types.Val{Tid: types.StringID, + a, err = indexTokensForTest("name", "fi", types.Val{Tid: types.StringID, Value: []byte("edeltäneessä")}) require.NoError(t, err) require.EqualValues(t, "\x08edeltän", string(a[0])) // ensure that German tokenizer is used - a, err = indexTokens("name", "de", types.Val{Tid: types.StringID, + a, err = indexTokensForTest("name", "de", types.Val{Tid: types.StringID, Value: []byte("Auffassungsvermögen")}) require.NoError(t, err) require.EqualValues(t, "\x08auffassungsvermog", string(a[0])) // ensure that default tokenizer works differently than German - a, err = indexTokens("name", "", types.Val{Tid: types.StringID, + a, err = indexTokensForTest("name", "", types.Val{Tid: types.StringID, Value: []byte("Auffassungsvermögen")}) require.NoError(t, err) require.EqualValues(t, "\x08auffassungsvermögen", string(a[0])) @@ -106,18 +119,22 @@ func TestIndexingInvalidLang(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("name:string @index(fulltext) ."), 1)) // tokenizer for "xx" language won't return an error. - _, err := indexTokens("name", "xx", types.Val{Tid: types.StringID, Value: []byte("error")}) + _, err := indexTokensForTest("name", "xx", types.Val{Tid: types.StringID, + Value: []byte("error")}) require.NoError(t, err) } func TestIndexingAliasedLang(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte("name:string @index(fulltext) @lang ."), 1)) - _, err := indexTokens("name", "es", types.Val{Tid: types.StringID, Value: []byte("base")}) + _, err := indexTokensForTest("name", "es", types.Val{Tid: types.StringID, + Value: []byte("base")}) require.NoError(t, err) // es-es and es-419 are aliased to es - _, err = indexTokens("name", "es-es", types.Val{Tid: types.StringID, Value: []byte("alias")}) + _, err = indexTokensForTest("name", "es-es", types.Val{Tid: types.StringID, + Value: []byte("alias")}) require.NoError(t, err) - _, err = indexTokens("name", "es-419", types.Val{Tid: types.StringID, Value: []byte("alias")}) + _, err = indexTokensForTest("name", "es-419", types.Val{Tid: types.StringID, + Value: []byte("alias")}) require.NoError(t, err) } @@ -152,6 +169,13 @@ dob:dateTime @index(year) . friend:uid @reverse . ` +const mutatedSchemaVal = ` +name:string @index(term) . +name2:string . +dob:dateTime @index(year) . +friend:uid @reverse . + ` + // TODO(Txn): We can't read index key on disk if it was written in same txn. func TestTokensTable(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) @@ -237,13 +261,6 @@ func TestRebuildIndex(t *testing.T) { addEdgeToValue(t, "name2", 91, "Michonne", uint64(1), uint64(2)) addEdgeToValue(t, "name2", 92, "David", uint64(3), uint64(4)) - { - txn := ps.NewTransactionAt(1, true) - require.NoError(t, txn.Set(x.IndexKey("name2", "wrongname21"), []byte("nothing"))) - require.NoError(t, txn.Set(x.IndexKey("name2", "wrongname22"), []byte("nothing"))) - require.NoError(t, txn.CommitAt(1, nil)) - } - currentSchema, _ := schema.State().Get("name2") rb := IndexRebuild{ Attr: "name2", @@ -289,6 +306,60 @@ func TestRebuildIndex(t *testing.T) { require.EqualValues(t, 91, uids2[0]) } +func TestRebuildIndexWithDeletion(t *testing.T) { + require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) + addEdgeToValue(t, "name2", 91, "Michonne", uint64(1), uint64(2)) + addEdgeToValue(t, "name2", 92, "David", uint64(3), uint64(4)) + + currentSchema, _ := schema.State().Get("name2") + rb := IndexRebuild{ + Attr: "name2", + StartTs: 5, + OldSchema: nil, + CurrentSchema: ¤tSchema, + } + require.NoError(t, RebuildIndex(context.Background(), &rb)) + + // Mutate the schema (the index in name2 is deleted) and rebuild the index. + require.NoError(t, schema.ParseBytes([]byte(mutatedSchemaVal), 1)) + newSchema, _ := schema.State().Get("name2") + rb = IndexRebuild{ + Attr: "name2", + StartTs: 6, + OldSchema: ¤tSchema, + CurrentSchema: &newSchema, + } + require.NoError(t, RebuildIndex(context.Background(), &rb)) + + // Check index entries in data store. + txn := ps.NewTransactionAt(7, false) + defer txn.Discard() + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + pk := x.ParsedKey{Attr: "name2"} + prefix := pk.IndexPrefix() + var idxKeys []string + var idxVals []*List + for it.Seek(prefix); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + if !bytes.HasPrefix(key, prefix) { + break + } + if item.UserMeta()&BitEmptyPosting == BitEmptyPosting { + continue + } + idxKeys = append(idxKeys, string(key)) + l, err := GetNoStore(key) + require.NoError(t, err) + idxVals = append(idxVals, l) + } + + // The index keys should not be available anymore. + require.Len(t, idxKeys, 0) + require.Len(t, idxVals, 0) +} + func TestRebuildReverseEdges(t *testing.T) { require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) addEdgeToUID(t, "friend", 1, 23, uint64(10), uint64(11)) @@ -344,77 +415,115 @@ func TestRebuildReverseEdges(t *testing.T) { } func TestNeedsIndexRebuild(t *testing.T) { - s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID} - s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID} - require.Equal(t, indexOp(indexNoop), needsIndexRebuild(&s1, &s2)) - require.Equal(t, indexOp(indexNoop), needsIndexRebuild(nil, &s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + rb := IndexRebuild{} + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID} + rebuildInfo := rb.needsIndexRebuild() + require.Equal(t, indexOp(indexNoop), rebuildInfo.op) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToDelete) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) + + rb.OldSchema = nil + rebuildInfo = rb.needsIndexRebuild() + require.Equal(t, indexOp(indexNoop), rebuildInfo.op) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToDelete) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) + + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - require.Equal(t, indexOp(indexNoop), needsIndexRebuild(&s1, &s2)) + rebuildInfo = rb.needsIndexRebuild() + require.Equal(t, indexOp(indexNoop), rebuildInfo.op) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToDelete) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"term"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX} - require.Equal(t, indexOp(indexRebuild), needsIndexRebuild(&s1, &s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, + Directive: pb.SchemaUpdate_INDEX} + rebuildInfo = rb.needsIndexRebuild() + require.Equal(t, indexOp(indexRebuild), rebuildInfo.op) + require.Equal(t, []string{"term"}, rebuildInfo.tokenizersToDelete) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) + + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_INDEX, + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, + Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - require.Equal(t, indexOp(indexRebuild), needsIndexRebuild(&s1, &s2)) + rebuildInfo = rb.needsIndexRebuild() + require.Equal(t, indexOp(indexRebuild), rebuildInfo.op) + require.Equal(t, []string{"exact"}, rebuildInfo.tokenizersToDelete) + require.Equal(t, []string{"exact"}, rebuildInfo.tokenizersToRebuild) - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_NONE} - require.Equal(t, indexOp(indexDelete), needsIndexRebuild(&s1, &s2)) + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, + Directive: pb.SchemaUpdate_NONE} + rebuildInfo = rb.needsIndexRebuild() + require.Equal(t, indexOp(indexDelete), rebuildInfo.op) + require.Equal(t, []string{"exact"}, rebuildInfo.tokenizersToDelete) + require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) } func TestNeedsCountIndexRebuild(t *testing.T) { - s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID} - s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: true} - require.Equal(t, indexOp(indexRebuild), needsCountIndexRebuild(&s1, &s2)) - require.Equal(t, indexOp(indexRebuild), needsCountIndexRebuild(nil, &s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} - require.Equal(t, indexOp(indexNoop), needsCountIndexRebuild(&s1, &s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: true} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} - require.Equal(t, indexOp(indexDelete), needsCountIndexRebuild(&s1, &s2)) + rb := IndexRebuild{} + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: true} + require.Equal(t, indexOp(indexRebuild), rb.needsCountIndexRebuild()) + + rb.OldSchema = nil + require.Equal(t, indexOp(indexRebuild), rb.needsCountIndexRebuild()) + + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} + require.Equal(t, indexOp(indexNoop), rb.needsCountIndexRebuild()) + + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: true} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} + require.Equal(t, indexOp(indexDelete), rb.needsCountIndexRebuild()) } func TestNeedsReverseEdgesRebuild(t *testing.T) { - s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_INDEX} - s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} - require.Equal(t, indexOp(indexRebuild), needsReverseEdgesRebuild(&s1, &s2)) - require.Equal(t, indexOp(indexRebuild), needsReverseEdgesRebuild(nil, &s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} - require.Equal(t, indexOp(indexNoop), needsReverseEdgesRebuild(&s1, &s2)) - - s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_INDEX} - require.Equal(t, indexOp(indexDelete), needsReverseEdgesRebuild(&s1, &s2)) + rb := IndexRebuild{} + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_INDEX} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, + Directive: pb.SchemaUpdate_REVERSE} + require.Equal(t, indexOp(indexRebuild), rb.needsReverseEdgesRebuild()) + + rb.OldSchema = nil + require.Equal(t, indexOp(indexRebuild), rb.needsReverseEdgesRebuild()) + + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, + Directive: pb.SchemaUpdate_REVERSE} + require.Equal(t, indexOp(indexNoop), rb.needsReverseEdgesRebuild()) + + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, + Directive: pb.SchemaUpdate_REVERSE} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, + Directive: pb.SchemaUpdate_INDEX} + require.Equal(t, indexOp(indexDelete), rb.needsReverseEdgesRebuild()) } func TestNeedsListTypeRebuild(t *testing.T) { - s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID, List: false} - s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID, List: true} - rebuild, err := needsListTypeRebuild(&s1, &s2) + rb := IndexRebuild{} + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, List: false} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, List: true} + rebuild, err := rb.needsListTypeRebuild() require.True(t, rebuild) require.NoError(t, err) - rebuild, err = needsListTypeRebuild(nil, &s2) + + rb.OldSchema = nil + rebuild, err = rb.needsListTypeRebuild() require.False(t, rebuild) require.NoError(t, err) - s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, List: true} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, List: false} - rebuild, err = needsListTypeRebuild(&s1, &s2) + rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, List: true} + rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID, List: false} + rebuild, err = rb.needsListTypeRebuild() require.False(t, rebuild) require.Error(t, err) } diff --git a/posting/mvcc.go b/posting/mvcc.go index e8f112ca9ad..909ce263c50 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -18,6 +18,7 @@ package posting import ( "bytes" + "encoding/hex" "fmt" "math" "strconv" @@ -218,7 +219,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return nil, err } } else { - x.Fatalf("unexpected meta: %d", item.UserMeta()) + x.Fatalf("unexpected meta: %d %s", item.UserMeta(), hex.Dump(key)) } if item.DiscardEarlierVersions() { break diff --git a/tok/tok.go b/tok/tok.go index 5b1c9e32cdf..d2e55e5cccc 100644 --- a/tok/tok.go +++ b/tok/tok.go @@ -18,6 +18,7 @@ package tok import ( "encoding/binary" + "fmt" "plugin" "time" @@ -113,6 +114,19 @@ func GetTokenizer(name string) (Tokenizer, bool) { return t, found } +// GetTokenizers returns a list of tokenizer given a list of unique names. +func GetTokenizers(names []string) ([]Tokenizer, error) { + var tokenizers []Tokenizer + for _, name := range names { + t, found := GetTokenizer(name) + if !found { + return nil, fmt.Errorf("Invalid tokenizer %s", name) + } + tokenizers = append(tokenizers, t) + } + return tokenizers, nil +} + func registerTokenizer(t Tokenizer) { _, ok := tokenizers[t.Name()] x.AssertTruef(!ok, "Duplicate tokenizer: %s", t.Name()) diff --git a/x/x.go b/x/x.go index 3b5d25220ed..f95938d80fd 100644 --- a/x/x.go +++ b/x/x.go @@ -454,22 +454,22 @@ func SetupConnection(host string, tlsConf *TLSHelperConfig, useGz bool) (*grpc.C return grpc.Dial(host, dialOpts...) } -func Diff(targetMap map[string]struct{}, existingMap map[string]struct{}) ([]string, []string) { - var newGroups []string - var groupsToBeDeleted []string +func Diff(dst map[string]struct{}, src map[string]struct{}) ([]string, []string) { + var add []string + var del []string - for g := range targetMap { - if _, ok := existingMap[g]; !ok { - newGroups = append(newGroups, g) + for g := range dst { + if _, ok := src[g]; !ok { + add = append(add, g) } } - for g := range existingMap { - if _, ok := targetMap[g]; !ok { - groupsToBeDeleted = append(groupsToBeDeleted, g) + for g := range src { + if _, ok := dst[g]; !ok { + del = append(del, g) } } - return newGroups, groupsToBeDeleted + return add, del } func SpanTimer(span *trace.Span, name string) func() {