From 38b8ccd63039608e1869eba1ce23015c293f712f Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 16 Aug 2019 10:32:24 -0700 Subject: [PATCH] Add additional logs to show progress of reindexing operation. (#3746) --- posting/index.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/posting/index.go b/posting/index.go index da44e8ad992..6a2223de447 100644 --- a/posting/index.go +++ b/posting/index.go @@ -492,8 +492,9 @@ type rebuilder struct { } func (r *rebuilder) Run(ctx context.Context) error { - glog.V(1).Infof("Rebuild: Starting process. StartTs=%d. Prefix=\n%s\n", - r.startTs, hex.Dump(r.prefix)) + glog.V(1).Infof( + "Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n", + r.attr, r.startTs, hex.Dump(r.prefix)) // We create one txn for all the mutations to be housed in. We also create a // localized posting list cache, to avoid stressing or mixing up with the @@ -501,7 +502,7 @@ func (r *rebuilder) Run(ctx context.Context) error { txn := NewTxn(r.startTs) stream := pstore.NewStreamAt(r.startTs) - stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s", r.attr) + stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s:", r.attr) stream.Prefix = r.prefix stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { // We should return quickly if the context is no longer valid. @@ -533,16 +534,20 @@ func (r *rebuilder) Run(ctx context.Context) error { // so this function doesn't have any work to do. return nil } + if err := stream.Orchestrate(ctx); err != nil { return err } - glog.V(1).Infof("Rebuild: Iteration done. Now committing at ts=%d\n", r.startTs) + glog.V(1).Infof("Rebuilding index for predicate %s: Iteration done. Now committing at ts=%d\n", + r.attr, r.startTs) // Convert data into deltas. txn.Update() // Now we write all the created posting lists to disk. writer := NewTxnWriter(pstore) + counter := 0 + numDeltas := len(txn.cache.deltas) for key, delta := range txn.cache.deltas { if len(delta) == 0 { continue @@ -553,6 +558,12 @@ func (r *rebuilder) Run(ctx context.Context) error { if err := writer.SetAt([]byte(key), delta, BitDeltaPosting, r.startTs); err != nil { return err } + + counter++ + if counter%1e5 == 0 { + glog.V(1).Infof("Rebuilding index for predicate %s: wrote %d of %d deltas to disk.\n", + r.attr, counter, numDeltas) + } } glog.V(1).Infoln("Rebuild: Flushing all writes.") return writer.Flush()