Skip to content

Commit

Permalink
Add additional logs to show progress of reindexing operation. (#3746)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr authored Aug 16, 2019
1 parent 8dcdee2 commit 38b8ccd
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,16 +492,17 @@ type rebuilder struct {
}

func (r *rebuilder) Run(ctx context.Context) error {
glog.V(1).Infof("Rebuild: Starting process. StartTs=%d. Prefix=\n%s\n",
r.startTs, hex.Dump(r.prefix))
glog.V(1).Infof(
"Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n",
r.attr, r.startTs, hex.Dump(r.prefix))

// We create one txn for all the mutations to be housed in. We also create a
// localized posting list cache, to avoid stressing or mixing up with the
// global lcache (the LRU cache).
txn := NewTxn(r.startTs)

stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s", r.attr)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s:", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -533,16 +534,20 @@ func (r *rebuilder) Run(ctx context.Context) error {
// so this function doesn't have any work to do.
return nil
}

if err := stream.Orchestrate(ctx); err != nil {
return err
}
glog.V(1).Infof("Rebuild: Iteration done. Now committing at ts=%d\n", r.startTs)
glog.V(1).Infof("Rebuilding index for predicate %s: Iteration done. Now committing at ts=%d\n",
r.attr, r.startTs)

// Convert data into deltas.
txn.Update()

// Now we write all the created posting lists to disk.
writer := NewTxnWriter(pstore)
counter := 0
numDeltas := len(txn.cache.deltas)
for key, delta := range txn.cache.deltas {
if len(delta) == 0 {
continue
Expand All @@ -553,6 +558,12 @@ func (r *rebuilder) Run(ctx context.Context) error {
if err := writer.SetAt([]byte(key), delta, BitDeltaPosting, r.startTs); err != nil {
return err
}

counter++
if counter%1e5 == 0 {
glog.V(1).Infof("Rebuilding index for predicate %s: wrote %d of %d deltas to disk.\n",
r.attr, counter, numDeltas)
}
}
glog.V(1).Infoln("Rebuild: Flushing all writes.")
return writer.Flush()
Expand Down

0 comments on commit 38b8ccd

Please sign in to comment.