diff --git a/posting/index.go b/posting/index.go index f0ca5775839..0ba0261af19 100644 --- a/posting/index.go +++ b/posting/index.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "fmt" "math" + "reflect" "time" "github.com/golang/glog" @@ -401,43 +402,31 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error { }) } -func compareAttrAndType(key []byte, attr string, typ byte) bool { - pk := x.Parse(key) - if pk == nil { - return true - } - if pk.Attr == attr && pk.IsType(typ) { +func deleteAllEntries(prefix []byte) error { + return deleteEntries(prefix, func(key []byte) bool { return true - } - return false + }) } -func DeleteReverseEdges(attr string) error { - // Delete index entries from data store. +func deleteIndex(attr string) error { pk := x.ParsedKey{Attr: attr} - prefix := pk.ReversePrefix() - return deleteEntries(prefix, func(key []byte) bool { - return true - }) + prefix := pk.IndexPrefix() + return deleteAllEntries(prefix) } -func deleteCountIndex(attr string, reverse bool) error { +func deleteReverseEdges(attr string) error { pk := x.ParsedKey{Attr: attr} - prefix := pk.CountPrefix(reverse) - return deleteEntries(prefix, func(key []byte) bool { - return true - }) + prefix := pk.ReversePrefix() + return deleteAllEntries(prefix) } -func DeleteCountIndex(attr string) error { - // Delete index entries from data store. - if err := deleteCountIndex(attr, false); err != nil { - return err - } - if err := deleteCountIndex(attr, true); err != nil { // delete reverse count indexes. +func deleteCountIndex(attr string) error { + pk := x.ParsedKey{Attr: attr} + if err := deleteAllEntries(pk.CountPrefix(false)); err != nil { return err } - return nil + + return deleteAllEntries(pk.CountPrefix(true)) } // Index rebuilding logic here. @@ -537,15 +526,110 @@ func (r *rebuild) Run(ctx context.Context) error { return writer.Flush() } +// IndexRebuild holds the info needed to initiate a rebuilt of the indices. +type IndexRebuild struct { + Attr string + StartTs uint64 + OldSchema *pb.SchemaUpdate + CurrentSchema *pb.SchemaUpdate +} + +type indexOp int + +const ( + indexNoop indexOp = iota // Index should be left alone. + indexDelete = iota // Index should be deleted. + indexRebuild = iota // Index should be deleted and rebuilt. +) + +// Run rebuilds all indices that need it. +func (rb *IndexRebuild) Run(ctx context.Context) error { + if err := RebuildListType(ctx, rb); err != nil { + return err + } + if err := RebuildIndex(ctx, rb); err != nil { + return err + } + if err := RebuildCountIndex(ctx, rb); err != nil { + return err + } + if err := RebuildReverseEdges(ctx, rb); err != nil { + return err + } + + return nil +} + +func needsIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { + x.AssertTruef(current != nil, "Current schema cannot be nil.") + + if old == nil { + old = &pb.SchemaUpdate{} + } + + currIndex := current.Directive == pb.SchemaUpdate_INDEX + prevIndex := old.Directive == pb.SchemaUpdate_INDEX + + // Index does not need to be rebuilt or deleted if the scheme directive + // did not require an index before and now. + if !currIndex && !prevIndex { + return indexNoop + } + + // Index only needs to be deleted if the schema directive changed and the + // new directive does not require an index. Predicate is not checking + // prevIndex since the previous if statement guarantees both values are + // different. + if !currIndex { + return indexDelete + } + + // Index needs to be rebuilt if the value types have changed. + if currIndex && current.ValueType != old.ValueType { + return indexRebuild + } + + // Index needs to be rebuilt if the tokenizers have changed + prevTokens := make(map[string]bool) + for _, t := range old.Tokenizer { + prevTokens[t] = true + } + currTokens := make(map[string]bool) + for _, t := range current.Tokenizer { + currTokens[t] = true + } + + if equal := reflect.DeepEqual(prevTokens, currTokens); equal { + return indexNoop + } + return indexRebuild +} + // RebuildIndex rebuilds index for a given attribute. // We commit mutations with startTs and ignore the errors. -func RebuildIndex(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().IsIndexed(attr), "Attr %s not indexed", attr) +func RebuildIndex(ctx context.Context, rb *IndexRebuild) error { + // Exit early if indices do not need to be rebuilt. + op := needsIndexRebuild(rb.OldSchema, rb.CurrentSchema) - pk := x.ParsedKey{Attr: attr} - builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs} + if op == indexNoop { + return nil + } + + glog.Infof("Deleting index for %s", rb.Attr) + if err := deleteIndex(rb.Attr); err != nil { + return err + } + + // Exit early if the index only neeed to be deleted and not rebuild. + if op == indexDelete { + return nil + } + + glog.Infof("Rebuilding index for %s", rb.Attr) + pk := x.ParsedKey{Attr: rb.Attr} + builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs} builder.fn = func(uid uint64, pl *List, txn *Txn) error { - edge := pb.DirectedEdge{Attr: attr, Entity: uid} + edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid} return pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { // Add index entries based on p. val := types.Val{ @@ -567,17 +651,54 @@ func RebuildIndex(ctx context.Context, attr string, startTs uint64) error { return builder.Run(ctx) } -func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().HasCount(attr), "Attr %s doesn't have count index", attr) +func needsCountIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { + x.AssertTruef(current != nil, "Current schema cannot be nil.") + + if old == nil { + old = &pb.SchemaUpdate{} + } + + // Do nothing if the schema directive did not change. + if !current.Count == !old.Count { + return indexNoop + + } + + // If the new schema does not require an index, delete the current index. + if !current.Count { + return indexDelete + } + + // Otherwise, the index needs to be rebuilt. + return indexRebuild +} + +// RebuildCountIndex rebuilds the count index for a given attribute. +func RebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { + op := needsCountIndexRebuild(rb.OldSchema, rb.CurrentSchema) + if op == indexNoop { + return nil + } + glog.Infof("Deleting count index for %s", rb.Attr) + if err := deleteCountIndex(rb.Attr); err != nil { + return err + } + + // Exit early if attribute is index only needed to be deleted. + if op == indexDelete { + return nil + } + + glog.Infof("Rebuilding count index for %s", rb.Attr) var reverse bool fn := func(uid uint64, pl *List, txn *Txn) error { t := &pb.DirectedEdge{ ValueId: uid, - Attr: attr, + Attr: rb.Attr, Op: pb.DirectedEdge_SET, } - sz := pl.Length(startTs, 0) + sz := pl.Length(rb.StartTs, 0) if sz == -1 { return nil } @@ -593,8 +714,8 @@ func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error { } // Create the forward index. - pk := x.ParsedKey{Attr: attr} - builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs} + pk := x.ParsedKey{Attr: rb.Attr} + builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs} builder.fn = fn if err := builder.Run(ctx); err != nil { return err @@ -602,24 +723,56 @@ func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error { // Create the reverse index. reverse = true - builder = rebuild{prefix: pk.ReversePrefix(), startTs: startTs} + builder = rebuild{prefix: pk.ReversePrefix(), startTs: rb.StartTs} builder.fn = fn return builder.Run(ctx) } -type item struct { - uid uint64 - list *List +func needsReverseEdgesRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) indexOp { + x.AssertTruef(current != nil, "Current schema cannot be nil.") + + if old == nil { + old = &pb.SchemaUpdate{} + } + + currIndex := current.Directive == pb.SchemaUpdate_REVERSE + prevIndex := old.Directive == pb.SchemaUpdate_REVERSE + + // If the schema directive did not change, return indexNoop. + if currIndex == prevIndex { + return indexNoop + } + + // If the current schema requires an index, index should be rebuild. + if currIndex { + return indexRebuild + } + // Otherwise, index should only be deleted. + return indexDelete } // RebuildReverseEdges rebuilds the reverse edges for a given attribute. -func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().IsReversed(attr), "Attr %s doesn't have reverse", attr) +func RebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { + op := needsReverseEdgesRebuild(rb.OldSchema, rb.CurrentSchema) + if op == indexNoop { + return nil + } - pk := x.ParsedKey{Attr: attr} - builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs} + glog.Infof("Deleting reverse index for %s", rb.Attr) + if err := deleteReverseEdges(rb.Attr); err != nil { + return err + } + + // Exit early if index only needed to be deleted. + if op == indexDelete { + return nil + } + + glog.Infof("Rebuilding reverse index for %s", rb.Attr) + pk := x.ParsedKey{Attr: rb.Attr} + builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs} builder.fn = func(uid uint64, pl *List, txn *Txn) error { - edge := pb.DirectedEdge{Attr: attr, Entity: uid} + edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid} return pl.Iterate(txn.StartTs, 0, func(pp *pb.Posting) error { puid := pp.Uid // Add reverse entries based on p. @@ -642,26 +795,35 @@ func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error return builder.Run(ctx) } -func DeleteIndex(attr string) error { - // Delete index entries from data store. - pk := x.ParsedKey{Attr: attr} - prefix := pk.IndexPrefix() - return deleteEntries(prefix, func(key []byte) bool { - return true - }) +// needsListTypeRebuild returns true if the schema changed from a scalar to a +// list. It returns true if the index can be left as is. +func needsListTypeRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) (bool, error) { + x.AssertTruef(current != nil, "Current schema cannot be nil.") + + if old == nil { + return false, nil + } + if current.List && !old.List { + return true, nil + } + if old.List && !current.List { + return false, fmt.Errorf("Type can't be changed from list to scalar for attr: [%s]"+ + " without dropping it first.", current.Predicate) + } + + return false, nil } -// This function is called when the schema is changed from scalar to list type. +// RebuildListType rebuilds the index when the schema is changed from scalar to list type. // We need to fingerprint the values to get the new ValueId. -func RebuildListType(ctx context.Context, attr string, startTs uint64) error { - x.AssertTruef(schema.State().IsList(attr), "Attr %s is not of list type", attr) - - // Let's clear out the cache for anything which belongs to this attribute, - // so once we're done, any reads would see the new list type. Note that we - // don't use lcache during the rebuild process. +func RebuildListType(ctx context.Context, rb *IndexRebuild) error { + if needsRebuild, err := needsListTypeRebuild(rb.OldSchema, rb.CurrentSchema); !needsRebuild || + err != nil { + return err + } - pk := x.ParsedKey{Attr: attr} - builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs} + pk := x.ParsedKey{Attr: rb.Attr} + builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs} builder.fn = func(uid uint64, pl *List, txn *Txn) error { var mpost *pb.Posting err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { @@ -681,7 +843,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error { // Delete the old edge corresponding to ValueId math.MaxUint64 t := &pb.DirectedEdge{ ValueId: mpost.Uid, - Attr: attr, + Attr: rb.Attr, Op: pb.DirectedEdge_DEL, } @@ -693,7 +855,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error { } // Add the new edge with the fingerprinted value id. newEdge := &pb.DirectedEdge{ - Attr: attr, + Attr: rb.Attr, Value: mpost.Value, ValueType: mpost.ValType, Op: pb.DirectedEdge_SET, @@ -705,6 +867,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error { return builder.Run(ctx) } +// DeleteAll deletes all entries in the posting list. func DeleteAll() error { return deleteEntries(nil, func(key []byte) bool { pk := x.Parse(key) @@ -719,6 +882,7 @@ func DeleteAll() error { }) } +// DeletePredicate deletes all entries and indices for a given predicate. func DeletePredicate(ctx context.Context, attr string) error { glog.Infof("Dropping predicate: [%s]", attr) pk := x.ParsedKey{ @@ -737,18 +901,18 @@ func DeletePredicate(ctx context.Context, attr string) error { indexed := schema.State().IsIndexed(attr) reversed := schema.State().IsReversed(attr) if indexed { - if err := DeleteIndex(attr); err != nil { + if err := deleteIndex(attr); err != nil { return err } } else if reversed { - if err := DeleteReverseEdges(attr); err != nil { + if err := deleteReverseEdges(attr); err != nil { return err } } hasCountIndex := schema.State().HasCount(attr) if hasCountIndex { - if err := DeleteCountIndex(attr); err != nil { + if err := deleteCountIndex(attr); err != nil { return err } } diff --git a/posting/index_test.go b/posting/index_test.go index 844931e1d58..ea226208c4c 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -32,10 +32,6 @@ import ( "github.com/dgraph-io/dgraph/x" ) -const schemaStr = ` -name:string @index(term) . -` - func uids(l *List, readTs uint64) []uint64 { r, err := l.Uids(ListOptions{ReadTs: readTs}) x.Check(err) @@ -43,42 +39,44 @@ func uids(l *List, readTs uint64) []uint64 { } func TestIndexingInt(t *testing.T) { - schema.ParseBytes([]byte("age:int @index(int) ."), 1) + require.NoError(t, schema.ParseBytes([]byte("age:int @index(int) ."), 1)) a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("10")}) require.NoError(t, err) require.EqualValues(t, []byte{0x6, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa}, []byte(a[0])) } func TestIndexingIntNegative(t *testing.T) { - schema.ParseBytes([]byte("age:int @index(int) ."), 1) + require.NoError(t, schema.ParseBytes([]byte("age:int @index(int) ."), 1)) a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("-10")}) require.NoError(t, err) - require.EqualValues(t, []byte{0x6, 0x0, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xf6}, []byte(a[0])) + require.EqualValues(t, []byte{0x6, 0x0, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xf6}, + []byte(a[0])) } func TestIndexingFloat(t *testing.T) { - schema.ParseBytes([]byte("age:float @index(float) ."), 1) + require.NoError(t, schema.ParseBytes([]byte("age:float @index(float) ."), 1)) a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("10.43")}) require.NoError(t, err) require.EqualValues(t, []byte{0x7, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa}, []byte(a[0])) } func TestIndexingTime(t *testing.T) { - schema.ParseBytes([]byte("age:dateTime @index(year) ."), 1) - a, err := indexTokens("age", "", types.Val{Tid: types.StringID, Value: []byte("0010-01-01T01:01:01.000000001")}) + require.NoError(t, schema.ParseBytes([]byte("age:dateTime @index(year) ."), 1)) + a, err := indexTokens("age", "", types.Val{Tid: types.StringID, + Value: []byte("0010-01-01T01:01:01.000000001")}) require.NoError(t, err) require.EqualValues(t, []byte{0x4, 0x0, 0xa}, []byte(a[0])) } func TestIndexing(t *testing.T) { - schema.ParseBytes([]byte("name:string @index(term) ."), 1) + require.NoError(t, schema.ParseBytes([]byte("name:string @index(term) ."), 1)) a, err := indexTokens("name", "", types.Val{Tid: types.StringID, Value: []byte("abc")}) require.NoError(t, err) require.EqualValues(t, "\x01abc", string(a[0])) } func TestIndexingMultiLang(t *testing.T) { - schema.ParseBytes([]byte("name:string @index(fulltext) ."), 1) + require.NoError(t, schema.ParseBytes([]byte("name:string @index(fulltext) ."), 1)) // ensure that default tokenizer is suitable for English a, err := indexTokens("name", "", types.Val{Tid: types.StringID, Value: []byte("stemming")}) @@ -86,23 +84,26 @@ func TestIndexingMultiLang(t *testing.T) { require.EqualValues(t, "\x08stem", string(a[0])) // ensure that Finnish tokenizer is used - a, err = indexTokens("name", "fi", types.Val{Tid: types.StringID, Value: []byte("edeltäneessä")}) + a, err = indexTokens("name", "fi", types.Val{Tid: types.StringID, + Value: []byte("edeltäneessä")}) require.NoError(t, err) require.EqualValues(t, "\x08edeltän", string(a[0])) // ensure that German tokenizer is used - a, err = indexTokens("name", "de", types.Val{Tid: types.StringID, Value: []byte("Auffassungsvermögen")}) + a, err = indexTokens("name", "de", types.Val{Tid: types.StringID, + Value: []byte("Auffassungsvermögen")}) require.NoError(t, err) require.EqualValues(t, "\x08auffassungsvermog", string(a[0])) // ensure that default tokenizer works differently than German - a, err = indexTokens("name", "", types.Val{Tid: types.StringID, Value: []byte("Auffassungsvermögen")}) + a, err = indexTokens("name", "", types.Val{Tid: types.StringID, + Value: []byte("Auffassungsvermögen")}) require.NoError(t, err) require.EqualValues(t, "\x08auffassungsvermögen", string(a[0])) } func TestIndexingInvalidLang(t *testing.T) { - schema.ParseBytes([]byte("name:string @index(fulltext) ."), 1) + require.NoError(t, schema.ParseBytes([]byte("name:string @index(fulltext) ."), 1)) // tokenizer for "xx" language won't return an error. _, err := indexTokens("name", "xx", types.Val{Tid: types.StringID, Value: []byte("error")}) @@ -110,7 +111,7 @@ func TestIndexingInvalidLang(t *testing.T) { } func TestIndexingAliasedLang(t *testing.T) { - schema.ParseBytes([]byte("name:string @index(fulltext) @lang ."), 1) + require.NoError(t, schema.ParseBytes([]byte("name:string @index(fulltext) @lang ."), 1)) _, err := indexTokens("name", "es", types.Val{Tid: types.StringID, Value: []byte("base")}) require.NoError(t, err) // es-es and es-419 are aliased to es @@ -153,8 +154,7 @@ friend:uid @reverse . // TODO(Txn): We can't read index key on disk if it was written in same txn. func TestTokensTable(t *testing.T) { - err := schema.ParseBytes([]byte(schemaVal), 1) - require.NoError(t, err) + require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) key := x.DataKey("name", 1) l, err := getNew(key, ps) @@ -232,25 +232,8 @@ func addEdgeToUID(t *testing.T, attr string, src uint64, addMutation(t, l, edge, Set, startTs, commitTs, false) } -// addEdgeToUID adds uid edge with reverse edge -func addReverseEdge(t *testing.T, attr string, src uint64, - dst uint64, startTs, commitTs uint64) { - edge := &pb.DirectedEdge{ - ValueId: dst, - Label: "testing", - Attr: attr, - Entity: src, - Op: pb.DirectedEdge_SET, - } - txn := Txn{ - StartTs: startTs, - } - txn.addReverseMutation(context.Background(), edge) - require.NoError(t, txn.CommitToMemory(commitTs)) -} - func TestRebuildIndex(t *testing.T) { - schema.ParseBytes([]byte(schemaVal), 1) + require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) addEdgeToValue(t, "name2", 91, "Michonne", uint64(1), uint64(2)) addEdgeToValue(t, "name2", 92, "David", uint64(3), uint64(4)) @@ -261,8 +244,14 @@ func TestRebuildIndex(t *testing.T) { require.NoError(t, txn.CommitAt(1, nil)) } - require.NoError(t, DeleteIndex("name2")) - require.NoError(t, RebuildIndex(context.Background(), "name2", 5)) + currentSchema, _ := schema.State().Get("name2") + rb := IndexRebuild{ + Attr: "name2", + StartTs: 5, + OldSchema: nil, + CurrentSchema: ¤tSchema, + } + require.NoError(t, RebuildIndex(context.Background(), &rb)) // Check index entries in data store. txn := ps.NewTransactionAt(6, false) @@ -301,13 +290,20 @@ func TestRebuildIndex(t *testing.T) { } func TestRebuildReverseEdges(t *testing.T) { - schema.ParseBytes([]byte(schemaVal), 1) + require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) addEdgeToUID(t, "friend", 1, 23, uint64(10), uint64(11)) addEdgeToUID(t, "friend", 1, 24, uint64(12), uint64(13)) addEdgeToUID(t, "friend", 2, 23, uint64(14), uint64(15)) + currentSchema, _ := schema.State().Get("friend") + rb := IndexRebuild{ + Attr: "friend", + StartTs: 16, + OldSchema: nil, + CurrentSchema: ¤tSchema, + } // TODO: Remove after fixing sync marks. - RebuildReverseEdges(context.Background(), "friend", 16) + require.NoError(t, RebuildReverseEdges(context.Background(), &rb)) // Check index entries in data store. txn := ps.NewTransactionAt(17, false) @@ -346,3 +342,79 @@ func TestRebuildReverseEdges(t *testing.T) { require.EqualValues(t, 2, uids0[1]) require.EqualValues(t, 1, uids1[0]) } + +func TestNeedsIndexRebuild(t *testing.T) { + s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID} + s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID} + require.Equal(t, indexOp(indexNoop), needsIndexRebuild(&s1, &s2)) + require.Equal(t, indexOp(indexNoop), needsIndexRebuild(nil, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}} + require.Equal(t, indexOp(indexNoop), needsIndexRebuild(&s1, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"term"}} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX} + require.Equal(t, indexOp(indexRebuild), needsIndexRebuild(&s1, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}} + require.Equal(t, indexOp(indexRebuild), needsIndexRebuild(&s1, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, + Tokenizer: []string{"exact"}} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_NONE} + require.Equal(t, indexOp(indexDelete), needsIndexRebuild(&s1, &s2)) +} + +func TestNeedsCountIndexRebuild(t *testing.T) { + s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID} + s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: true} + require.Equal(t, indexOp(indexRebuild), needsCountIndexRebuild(&s1, &s2)) + require.Equal(t, indexOp(indexRebuild), needsCountIndexRebuild(nil, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} + require.Equal(t, indexOp(indexNoop), needsCountIndexRebuild(&s1, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: true} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Count: false} + require.Equal(t, indexOp(indexDelete), needsCountIndexRebuild(&s1, &s2)) +} + +func TestNeedsReverseEdgesRebuild(t *testing.T) { + s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_INDEX} + s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} + require.Equal(t, indexOp(indexRebuild), needsReverseEdgesRebuild(&s1, &s2)) + require.Equal(t, indexOp(indexRebuild), needsReverseEdgesRebuild(nil, &s2)) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} + require.Equal(t, indexOp(indexNoop), needsReverseEdgesRebuild(&s1, &s2)) + + s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_REVERSE} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, Directive: pb.SchemaUpdate_INDEX} + require.Equal(t, indexOp(indexDelete), needsReverseEdgesRebuild(&s1, &s2)) +} + +func TestNeedsListTypeRebuild(t *testing.T) { + s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID, List: false} + s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID, List: true} + rebuild, err := needsListTypeRebuild(&s1, &s2) + require.True(t, rebuild) + require.NoError(t, err) + rebuild, err = needsListTypeRebuild(nil, &s2) + require.False(t, rebuild) + require.NoError(t, err) + + s1 = pb.SchemaUpdate{ValueType: pb.Posting_UID, List: true} + s2 = pb.SchemaUpdate{ValueType: pb.Posting_UID, List: false} + rebuild, err = needsListTypeRebuild(&s1, &s2) + require.False(t, rebuild) + require.Error(t, err) +} diff --git a/worker/index.go b/worker/index.go deleted file mode 100644 index 03163f0e125..00000000000 --- a/worker/index.go +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package worker - -import ( - "golang.org/x/net/context" - - "github.com/dgraph-io/dgraph/posting" - "github.com/dgraph-io/dgraph/schema" - "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" -) - -func (n *node) rebuildOrDelIndex(ctx context.Context, attr string, rebuild bool, startTs uint64) error { - if schema.State().IsIndexed(attr) != rebuild { - return x.Errorf("Predicate %s index mismatch, rebuild %v", attr, rebuild) - } - // Remove index edges - glog.Infof("Deleting index for %s", attr) - if err := posting.DeleteIndex(attr); err != nil { - return err - } - if rebuild { - glog.Infof("Rebuilding index for %s", attr) - return posting.RebuildIndex(ctx, attr, startTs) - } - return nil -} - -func (n *node) rebuildOrDelRevEdge(ctx context.Context, attr string, rebuild bool, startTs uint64) error { - if schema.State().IsReversed(attr) != rebuild { - return x.Errorf("Predicate %s reverse mismatch, rebuild %v", attr, rebuild) - } - glog.Infof("Deleting reverse index for %s", attr) - if err := posting.DeleteReverseEdges(attr); err != nil { - return err - } - if rebuild { - // Remove reverse edges - glog.Infof("Rebuilding reverse index for %s", attr) - return posting.RebuildReverseEdges(ctx, attr, startTs) - } - return nil -} - -func (n *node) rebuildOrDelCountIndex(ctx context.Context, attr string, rebuild bool, startTs uint64) error { - glog.Infof("Deleting count index for %s", attr) - if err := posting.DeleteCountIndex(attr); err != nil { - return err - } - if rebuild { - glog.Infof("Rebuilding count index for %s", attr) - return posting.RebuildCountIndex(ctx, attr, startTs) - } - return nil -} diff --git a/worker/mutation.go b/worker/mutation.go index 4f2b92c7796..9e24ad297c8 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -19,7 +19,6 @@ package worker import ( "bytes" "errors" - "fmt" "math" "time" @@ -113,7 +112,6 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin } func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error { - n := groups().Node if !groups().ServesTablet(update.Predicate) { tablet := groups().Tablet(update.Predicate) return x.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet) @@ -121,10 +119,10 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start if err := checkSchema(update); err != nil { return err } - old, ok := schema.State().Get(update.Predicate) + old, _ := schema.State().Get(update.Predicate) current := *update - // Sets only in memory, we will update it on disk only after schema mutations is successful and persisted - // to disk. + // Sets only in memory, we will update it on disk only after schema mutations are successful and + // written to disk. schema.State().Set(update.Predicate, current) // Once we remove index or reverse edges from schema, even though the values @@ -139,83 +137,13 @@ func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, start // We need watermark for index/reverse edge addition for linearizable reads. // (both applied and synced watermarks). defer glog.Infof("Done schema update %+v\n", update) - if !ok { - if current.Directive == pb.SchemaUpdate_INDEX { - if err := n.rebuildOrDelIndex(ctx, update.Predicate, true, startTs); err != nil { - return err - } - } else if current.Directive == pb.SchemaUpdate_REVERSE { - if err := n.rebuildOrDelRevEdge(ctx, update.Predicate, true, startTs); err != nil { - return err - } - } - - if current.Count { - if err := n.rebuildOrDelCountIndex(ctx, update.Predicate, true, startTs); err != nil { - return err - } - } - return nil - } - - // schema was present already - if current.List && !old.List { - if err := posting.RebuildListType(ctx, update.Predicate, startTs); err != nil { - return err - } - } else if old.List && !current.List { - return fmt.Errorf("Type can't be changed from list to scalar for attr: [%s]"+ - " without dropping it first.", current.Predicate) - } - - if needReindexing(old, current) { - // Reindex if update.Index is true or remove index - if err := n.rebuildOrDelIndex(ctx, update.Predicate, - current.Directive == pb.SchemaUpdate_INDEX, startTs); err != nil { - return err - } - } else if needsRebuildingReverses(old, current) { - // Add or remove reverse edge based on update.Reverse - if err := n.rebuildOrDelRevEdge(ctx, update.Predicate, - current.Directive == pb.SchemaUpdate_REVERSE, startTs); err != nil { - return err - } - } - - if current.Count != old.Count { - if err := n.rebuildOrDelCountIndex(ctx, update.Predicate, current.Count, - startTs); err != nil { - return err - } - } - return nil -} - -func needsRebuildingReverses(old pb.SchemaUpdate, current pb.SchemaUpdate) bool { - return (current.Directive == pb.SchemaUpdate_REVERSE) != - (old.Directive == pb.SchemaUpdate_REVERSE) -} - -func needReindexing(old pb.SchemaUpdate, current pb.SchemaUpdate) bool { - if (current.Directive == pb.SchemaUpdate_INDEX) != (old.Directive == pb.SchemaUpdate_INDEX) { - return true + rebuild := posting.IndexRebuild{ + Attr: update.Predicate, + StartTs: startTs, + OldSchema: &old, + CurrentSchema: ¤t, } - // if value types has changed - if current.Directive == pb.SchemaUpdate_INDEX && current.ValueType != old.ValueType { - return true - } - // if tokenizer has changed - if same tokenizer works differently - // on different types - if len(current.Tokenizer) != len(old.Tokenizer) { - return true - } - for i, t := range old.Tokenizer { - if current.Tokenizer[i] != t { - return true - } - } - - return false + return rebuild.Run(ctx) } // We commit schema to disk in blocking way, should be ok because this happens @@ -331,8 +259,8 @@ func checkSchema(s *pb.SchemaUpdate) error { return nil } -// If storage type is specified, then check compatibility or convert to schema type -// if no storage type is specified then convert to schema type. +// ValidateAndConvert checks compatibility or converts to the schema type if the storage type is +// specified. If no storage type is specified then it converts to the schema type. func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error { if isDeletePredicateEdge(edge) { return nil @@ -341,7 +269,8 @@ func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error { return nil } //

Del on non list scalar type. - if edge.ValueId == 0 && !isStarAll(edge.Value) && edge.Op == pb.DirectedEdge_DEL && !su.GetList() { + if edge.ValueId == 0 && !isStarAll(edge.Value) && edge.Op == pb.DirectedEdge_DEL && + !su.GetList() { return x.Errorf("Please use * with delete operation for non-list type: [%v]", edge.Attr) } diff --git a/worker/mutation_test.go b/worker/mutation_test.go index 957599e5e78..bf452eff75c 100644 --- a/worker/mutation_test.go +++ b/worker/mutation_test.go @@ -127,7 +127,7 @@ func TestPopulateMutationMap(t *testing.T) { } func TestCheckSchema(t *testing.T) { - posting.DeleteAll() + require.NoError(t, posting.DeleteAll()) initTest(t, "name:string @index(term) .") // non uid to uid s1 := &pb.SchemaUpdate{Predicate: "name", ValueType: pb.Posting_UID} @@ -201,25 +201,3 @@ func TestCheckSchema(t *testing.T) { err = checkSchema(su[1]) require.NoError(t, err) } - -func TestNeedReindexing(t *testing.T) { - s1 := pb.SchemaUpdate{ValueType: pb.Posting_UID} - s2 := pb.SchemaUpdate{ValueType: pb.Posting_UID} - require.False(t, needReindexing(s1, s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - require.False(t, needReindexing(s1, s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"term"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX} - require.True(t, needReindexing(s1, s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - require.True(t, needReindexing(s1, s2)) - - s1 = pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - s2 = pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_NONE} - require.True(t, needReindexing(s1, s2)) -}