Skip to content

Commit

Permalink
Add log prefix to stream used to rebuild indices. (#3696)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Jul 19, 2019
1 parent c3852a1 commit d36c29f
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -479,8 +480,9 @@ func deleteCountIndex(attr string) error {
return pstore.DropPrefix(prefix)
}

// Index rebuilding logic here.
type rebuild struct {
// rebuilder handles the process of rebuilding an index.
type rebuilder struct {
attr string
prefix []byte
startTs uint64

Expand All @@ -489,7 +491,7 @@ type rebuild struct {
fn func(uid uint64, pl *List, txn *Txn) error
}

func (r *rebuild) Run(ctx context.Context) error {
func (r *rebuilder) Run(ctx context.Context) error {
glog.V(1).Infof("Rebuild: Starting process. StartTs=%d. Prefix=\n%s\n",
r.startTs, hex.Dump(r.prefix))

Expand All @@ -499,6 +501,7 @@ func (r *rebuild) Run(ctx context.Context) error {
txn := NewTxn(r.startTs)

stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -677,7 +680,7 @@ func rebuildIndex(ctx context.Context, rb *IndexRebuild) error {
}
}

// Exit early if the index only need to be deleted and not rebuild.
// Exit early if the index only need to be deleted and not rebuilt.
if rebuildInfo.op == indexDelete {
return nil
}
Expand All @@ -702,7 +705,7 @@ func rebuildIndex(ctx context.Context, rb *IndexRebuild) error {
}

pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
Expand Down Expand Up @@ -798,7 +801,7 @@ func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error {

// Create the forward index.
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = fn
if err := builder.Run(ctx); err != nil {
return err
Expand All @@ -809,7 +812,7 @@ func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error {
// to call builder.Run even if that's not the case as the reverse prefix
// will be empty.
reverse = true
builder = rebuild{prefix: pk.ReversePrefix(), startTs: rb.StartTs}
builder = rebuilder{attr: rb.Attr, prefix: pk.ReversePrefix(), startTs: rb.StartTs}
builder.fn = fn
return builder.Run(ctx)
}
Expand All @@ -832,7 +835,7 @@ func (rb *IndexRebuild) needsReverseEdgesRebuild() indexOp {
return indexNoop
}

// If the current schema requires an index, index should be rebuild.
// If the current schema requires an index, index should be rebuilt.
if currIndex {
return indexRebuild
}
Expand All @@ -859,7 +862,7 @@ func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error {

glog.Infof("Rebuilding reverse index for %s", rb.Attr)
pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
edge := pb.DirectedEdge{Attr: rb.Attr, Entity: uid}
return pl.Iterate(txn.StartTs, 0, func(pp *pb.Posting) error {
Expand Down Expand Up @@ -911,7 +914,7 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {
}

pk := x.ParsedKey{Attr: rb.Attr}
builder := rebuild{prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder := rebuilder{attr: rb.Attr, prefix: pk.DataPrefix(), startTs: rb.StartTs}
builder.fn = func(uid uint64, pl *List, txn *Txn) error {
var mpost *pb.Posting
err := pl.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
Expand Down

0 comments on commit d36c29f

Please sign in to comment.