Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rebuild index logic. #2851

Merged
merged 10 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 178 additions & 62 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,40 +401,35 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
})
}

func compareAttrAndType(key []byte, attr string, typ byte) bool {
pk := x.Parse(key)
if pk == nil {
return true
}
if pk.Attr == attr && pk.IsType(typ) {
func deleteAllEntries(prefix []byte) error {
return deleteEntries(prefix, func(key []byte) bool {
return true
}
return false
})
}

func DeleteReverseEdges(attr string) error {
// Delete index entries from data store.
func deleteIndex(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteAllEntries(prefix)
}

func deleteReverseEdges(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.ReversePrefix()
return deleteEntries(prefix, func(key []byte) bool {
return true
})
return deleteAllEntries(prefix)
}

func deleteCountIndex(attr string, reverse bool) error {
func deleteCountIndexInternal(attr string, reverse bool) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.CountPrefix(reverse)
return deleteEntries(prefix, func(key []byte) bool {
return true
})
return deleteAllEntries(prefix)
}

func DeleteCountIndex(attr string) error {
// Delete index entries from data store.
if err := deleteCountIndex(attr, false); err != nil {
func deleteCountIndex(attr string) error {
if err := deleteCountIndexInternal(attr, /*reverse*/ false); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File is not goimports-ed (from goimports)

return err
}
if err := deleteCountIndex(attr, true); err != nil { // delete reverse count indexes.
if err := deleteCountIndexInternal(attr, /*reverse*/ true); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -537,15 +532,83 @@ func (r *rebuild) Run(ctx context.Context) error {
return writer.Flush()
}

// RebuildIndicesRequest holds the info needed to initiate a rebuilt of the indices.
type RebuildIndicesRequest struct {
Attr string
StartTs uint64
OldSchema *pb.SchemaUpdate
CurrentSchema *pb.SchemaUpdate
}

// RebuildAllIndices rebuilds all indices that need it.
func RebuildAllIndices(ctx context.Context, req *RebuildIndicesRequest) error {
if err := RebuildListType(ctx, req); err != nil {
return err
}
if err := RebuildIndex(ctx, req); err != nil {
return err
}
if err := RebuildCountIndex(ctx, req); err != nil {
return err
}
if err := RebuildReverseEdges(ctx, req); err != nil {
return err
}

return nil
}

func needsIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) bool {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

// If no previous schema existed, the index needs to be built.
if old == nil {
return true
}
// Index needs to be rebuilt if the scheme directive changed.
if (current.Directive == pb.SchemaUpdate_INDEX) != (old.Directive == pb.SchemaUpdate_INDEX) {
return true
}
// Index needs to be rebuilt if the value types have changed.
if current.Directive == pb.SchemaUpdate_INDEX && current.ValueType != old.ValueType {
return true
}
// Index needs to be rebuilt if the tokenizers have changed
if len(current.Tokenizer) != len(old.Tokenizer) {
return true
}
for i, t := range old.Tokenizer {
if current.Tokenizer[i] != t {
return true
}
}

return false
}

// RebuildIndex rebuilds index for a given attribute.
// We commit mutations with startTs and ignore the errors.
func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsIndexed(attr), "Attr %s not indexed", attr)
func RebuildIndex(ctx context.Context, req *RebuildIndicesRequest) error {
// Exit early if indices do not need to be rebuilt.
if !needsIndexRebuild(req.OldSchema, req.CurrentSchema) {
return nil
}

pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
glog.Infof("Deleting index for %s", req.Attr)
if err := deleteIndex(req.Attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().IsIndexed(req.Attr) {
return nil
}

glog.Infof("Rebuilding index for %s", req.Attr)
pk := x.ParsedKey{Attr: req.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: req.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
edge := pb.DirectedEdge{Attr: attr, Entity: uid}
edge := pb.DirectedEdge{Attr: req.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
// Add index entries based on p.
val := types.Val{
Expand All @@ -567,17 +630,43 @@ func RebuildIndex(ctx context.Context, attr string, startTs uint64) error {
return builder.Run(ctx)
}

func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().HasCount(attr), "Attr %s doesn't have count index", attr)
func needsCountIndexRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) bool {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

if old == nil {
return true
}
if current.Count != old.Count {
return true
}
return false
}

// RebuildCountIndex rebuilds the count index for a given attribute.
func RebuildCountIndex(ctx context.Context, req *RebuildIndicesRequest) error {
if !needsCountIndexRebuild(req.OldSchema, req.CurrentSchema) {
return nil
}

glog.Infof("Deleting count index for %s", req.Attr)
if err := deleteCountIndex(req.Attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().HasCount(req.Attr) {
return nil
}

glog.Infof("Rebuilding count index for %s", req.Attr)
var reverse bool
fn := func(uid uint64, pl *List, txn *Txn) error {
t := &pb.DirectedEdge{
ValueId: uid,
Attr: attr,
Attr: req.Attr,
Op: pb.DirectedEdge_SET,
}
sz := pl.Length(startTs, 0)
sz := pl.Length(req.StartTs, 0)
if sz == -1 {
return nil
}
Expand All @@ -593,33 +682,51 @@ func RebuildCountIndex(ctx context.Context, attr string, startTs uint64) error {
}

// Create the forward index.
pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
pk := x.ParsedKey{Attr: req.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: req.StartTs}
builder.fn = fn
if err := builder.Run(ctx); err != nil {
return err
}

// Create the reverse index.
reverse = true
builder = rebuild{prefix: pk.ReversePrefix(), startTs: startTs}
builder = rebuild{prefix: pk.ReversePrefix(), startTs: req.StartTs}
builder.fn = fn
return builder.Run(ctx)
}

type item struct {
uid uint64
list *List
func needsReverseEdgesRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) bool {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

if old == nil {
return true
}
return (current.Directive == pb.SchemaUpdate_REVERSE) !=
(old.Directive == pb.SchemaUpdate_REVERSE)
}

// RebuildReverseEdges rebuilds the reverse edges for a given attribute.
func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsReversed(attr), "Attr %s doesn't have reverse", attr)
func RebuildReverseEdges(ctx context.Context, req *RebuildIndicesRequest) error {
if !needsReverseEdgesRebuild(req.OldSchema, req.CurrentSchema) {
return nil
}

pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
glog.Infof("Deleting reverse index for %s", req.Attr)
if err := deleteReverseEdges(req.Attr); err != nil {
return err
}

// Exit early if attribute is not indexed in the new schema.
if !schema.State().IsReversed(req.Attr) {
return nil
}

glog.Infof("Rebuilding reverse index for %s", req.Attr)
pk := x.ParsedKey{Attr: req.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: req.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
edge := pb.DirectedEdge{Attr: attr, Entity: uid}
edge := pb.DirectedEdge{Attr: req.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(pp *pb.Posting) error {
puid := pp.Uid
// Add reverse entries based on p.
Expand All @@ -642,26 +749,33 @@ func RebuildReverseEdges(ctx context.Context, attr string, startTs uint64) error
return builder.Run(ctx)
}

func DeleteIndex(attr string) error {
// Delete index entries from data store.
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteEntries(prefix, func(key []byte) bool {
return true
})
func needsListTypeRebuild(old *pb.SchemaUpdate, current *pb.SchemaUpdate) (bool, error) {
x.AssertTruef(current != nil, "Current schema cannot be nil.")

if old == nil {
return false, nil
}
if current.List && !old.List {
return true, nil
}
if old.List && !current.List {
return false, fmt.Errorf("Type can't be changed from list to scalar for attr: [%s]"+
" without dropping it first.", current.Predicate)
}

return false, nil
}

// This function is called when the schema is changed from scalar to list type.
// RebuildListType rebuilds the index when the schema is changed from scalar to list type.
// We need to fingerprint the values to get the new ValueId.
func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
x.AssertTruef(schema.State().IsList(attr), "Attr %s is not of list type", attr)

// Let's clear out the cache for anything which belongs to this attribute,
// so once we're done, any reads would see the new list type. Note that we
// don't use lcache during the rebuild process.
func RebuildListType(ctx context.Context, req *RebuildIndicesRequest) error {
if needsRebuild, err := needsListTypeRebuild(req.OldSchema, req.CurrentSchema); !needsRebuild ||
err != nil {
return err
}

pk := x.ParsedKey{Attr: attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: startTs}
pk := x.ParsedKey{Attr: req.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: req.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
var mpost *pb.Posting
err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
Expand All @@ -681,7 +795,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
// Delete the old edge corresponding to ValueId math.MaxUint64
t := &pb.DirectedEdge{
ValueId: mpost.Uid,
Attr: attr,
Attr: req.Attr,
Op: pb.DirectedEdge_DEL,
}

Expand All @@ -693,7 +807,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
}
// Add the new edge with the fingerprinted value id.
newEdge := &pb.DirectedEdge{
Attr: attr,
Attr: req.Attr,
Value: mpost.Value,
ValueType: mpost.ValType,
Op: pb.DirectedEdge_SET,
Expand All @@ -705,6 +819,7 @@ func RebuildListType(ctx context.Context, attr string, startTs uint64) error {
return builder.Run(ctx)
}

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
return deleteEntries(nil, func(key []byte) bool {
pk := x.Parse(key)
Expand All @@ -719,6 +834,7 @@ func DeleteAll() error {
})
}

// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string) error {
glog.Infof("Dropping predicate: [%s]", attr)
pk := x.ParsedKey{
Expand All @@ -737,18 +853,18 @@ func DeletePredicate(ctx context.Context, attr string) error {
indexed := schema.State().IsIndexed(attr)
reversed := schema.State().IsReversed(attr)
if indexed {
if err := DeleteIndex(attr); err != nil {
if err := deleteIndex(attr); err != nil {
return err
}
} else if reversed {
if err := DeleteReverseEdges(attr); err != nil {
if err := deleteReverseEdges(attr); err != nil {
return err
}
}

hasCountIndex := schema.State().HasCount(attr)
if hasCountIndex {
if err := DeleteCountIndex(attr); err != nil {
if err := deleteCountIndex(attr); err != nil {
return err
}
}
Expand Down
Loading