Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike: Pause compaction when out of sync #10392

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ func (b *Blockstore) doCopy(from, to *badger.DB) error {
if workers < 2 {
workers = 2
}
if workers > 8 {
workers = 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have confidence in this number? Did we check that it's "good enough" on mainnet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check on this. There's some evidence that it might be too high right now resulting in chain getting out of sync. Alternate strategy we can use is leave as is and see if chain sync issues persist correlated to moving GC.

@vyzo for reference do you have an estimate of a reasonable time for moving GC to complete today?

cc @TippyFlitsUK you might have an even better idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe 5-10 min.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets take a fresh measurement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference I got 50m on my fairly resourced mainnet node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ with current default of half CPUs
I'll measure with 8 and compare when I get to similar garbage levels

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working without major issues on mainnet (cc @TippyFlitsUK ) and this seems to be preventing out of sync on moving GC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last thing to note: I'm suspicious that reducing goroutines here is the main thing causing improvement since moving GC is anecdotally the thing which causes chain sync to get left behind and we are not doing any yielding during move (yet) in this change. If we are concerned about risk of compaction deadlock we could just include the simple go routine limiting of moving GC.

}

stream := from.NewStream()
stream.NumGo = workers
Expand Down Expand Up @@ -441,7 +444,7 @@ func (b *Blockstore) deleteDB(path string) {
}
}

func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64, checkFreq time.Duration, check func() error) error {
b.lockDB()
defer b.unlockDB()

Expand All @@ -458,11 +461,15 @@ func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
if err != nil {
return err
}

checkTick := time.NewTimer(checkFreq)
defer checkTick.Stop()
for err == nil {
select {
case <-ctx.Done():
err = ctx.Err()
case <-checkTick.C:
err = check()
checkTick.Reset(checkFreq)
default:
err = b.db.RunValueLogGC(threshold)
}
Expand Down Expand Up @@ -499,7 +506,17 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc
if threshold == 0 {
threshold = defaultGCThreshold
}
return b.onlineGC(ctx, threshold)
checkFreq := options.CheckFreq
if checkFreq < 30*time.Second { // disallow checking more frequently than block time
checkFreq = 30 * time.Second
}
check := options.Check
if check == nil {
check = func() error {
return nil
}
}
return b.onlineGC(ctx, threshold, checkFreq, check)
}

// GCOnce runs garbage collection on the value log;
Expand Down
19 changes: 19 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"time"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -57,6 +58,10 @@ type BlockstoreGCOptions struct {
FullGC bool
// fraction of garbage in badger vlog before its worth processing in online GC
Threshold float64
// how often to call the check function
CheckFreq time.Duration
// function to call periodically to pause or early terminate GC
Check func() error
}

func WithFullGC(fullgc bool) BlockstoreGCOption {
Expand All @@ -73,6 +78,20 @@ func WithThreshold(threshold float64) BlockstoreGCOption {
}
}

func WithCheckFreq(f time.Duration) BlockstoreGCOption {
return func(opts *BlockstoreGCOptions) error {
opts.CheckFreq = f
return nil
}
}

func WithCheck(check func() error) BlockstoreGCOption {
return func(opts *BlockstoreGCOptions) error {
opts.Check = check
return nil
}
}

// BlockstoreSize is a trait for on-disk blockstores that can report their size
type BlockstoreSize interface {
Size() (int64, error)
Expand Down
11 changes: 11 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ type SplitStore struct {
ctx context.Context
cancel func()

outOfSync int32 // for fast checking
chainSyncMx sync.Mutex
chainSyncCond sync.Cond
chainSyncFinished bool // protected by chainSyncMx

debug *debugLog

// transactional protection for concurrent read/writes during compaction
Expand Down Expand Up @@ -261,6 +266,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co

ss.txnViewsCond.L = &ss.txnViewsMx
ss.txnSyncCond.L = &ss.txnSyncMx
ss.chainSyncCond.L = &ss.chainSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

ss.reifyCond.L = &ss.reifyMx
Expand Down Expand Up @@ -822,6 +828,11 @@ func (s *SplitStore) Close() error {
s.txnSyncCond.Broadcast()
s.txnSyncMx.Unlock()

s.chainSyncMx.Lock()
s.chainSyncFinished = true
s.chainSyncCond.Broadcast()
s.chainSyncMx.Unlock()

log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
for atomic.LoadInt32(&s.compacting) == 1 {
time.Sleep(time.Second)
Expand Down
75 changes: 61 additions & 14 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,35 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
// Regardless, we put a mutex in HeadChange just to be safe

if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
// we are currently compacting -- protect the new tipset(s)
// we are currently compacting
// 1. Signal sync condition to yield compaction when out of sync and resume when in sync
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
if CheckSyncGap && time.Since(timestamp) > SyncGapTime {
/* Chain out of sync */
if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 1) {
// transition from in sync to out of sync
s.chainSyncMx.Lock()
s.chainSyncFinished = false
s.chainSyncMx.Unlock()
}
// already out of sync, no signaling necessary

}
// TODO: ok to use hysteresis with no transitions between 30s and 1m?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we're syncing, the head will only be taken when we get fully in sync, so this is probably fine

(well, really the sync new head is only selected when we complete the previous sync, but the effect is similar)

if time.Since(timestamp) < SyncWaitTime {
/* Chain in sync */
if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 0) {
// already in sync, no signaling necessary
} else {
// transition from out of sync to in sync
s.chainSyncMx.Lock()
s.chainSyncFinished = true
s.chainSyncCond.Broadcast()
s.chainSyncMx.Unlock()
}

}
// 2. protect the new tipset(s)
s.protectTipSets(apply)
return nil
}
Expand Down Expand Up @@ -427,7 +455,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) {
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return 0, err
}

Expand Down Expand Up @@ -545,7 +573,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
defer coldSet.Close() //nolint:errcheck

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -617,7 +645,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

log.Infow("marking done", "took", time.Since(startMark), "marked", *count)

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand All @@ -627,7 +655,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -704,7 +732,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt))

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand All @@ -713,7 +741,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// possibly delete objects we didn't have when we were collecting cold objects)
s.waitForMissingRefs(markSet)

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand All @@ -733,7 +761,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("moving done", "took", time.Since(startMove))

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -764,7 +792,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}

// wait for the head to catch up so that the current tipset is marked
s.waitForSync()
s.waitForTxnSync()

if err := s.checkClosing(); err != nil {
return err
Expand Down Expand Up @@ -865,7 +893,7 @@ func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
return nil
}

func (s *SplitStore) waitForSync() {
func (s *SplitStore) waitForTxnSync() {
log.Info("waiting for sync")
if !CheckSyncGap {
log.Warnf("If you see this outside of test it is a serious splitstore issue")
Expand All @@ -884,6 +912,25 @@ func (s *SplitStore) waitForSync() {
}
}

// Block compaction operations if chain sync has fallen behind
func (s *SplitStore) waitForSync() {
if atomic.LoadInt32(&s.outOfSync) == 0 {
return
}
s.chainSyncMx.Lock()
defer s.chainSyncMx.Unlock()

for !s.chainSyncFinished {
s.chainSyncCond.Wait()
}
}

// Combined sync and closing check
func (s *SplitStore) checkYield() error {
s.waitForSync()
return s.checkClosing()
}

func (s *SplitStore) endTxnProtect() {
s.txnLk.Lock()
defer s.txnLk.Unlock()
Expand Down Expand Up @@ -1037,7 +1084,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp

for len(toWalk) > 0 {
// walking can take a while, so check this with every opportunity
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -1106,7 +1153,7 @@ func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid
}

// check this before recursing
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return 0, err
}

Expand Down Expand Up @@ -1175,7 +1222,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m
}

// check this before recursing
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return sz, err
}

Expand Down Expand Up @@ -1262,7 +1309,7 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
batch := make([]blocks.Block, 0, batchSize)

err := coldr.ForEach(func(c cid.Cid) error {
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}
blk, err := s.hot.Get(s.ctx, c)
Expand Down
5 changes: 5 additions & 0 deletions blockstore/splitstore/splitstore_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ func (s *SplitStore) gcHotAfterCompaction() {
}

func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
if err := s.checkYield(); err != nil {
return err
}
if gc, ok := b.(bstore.BlockstoreGC); ok {
log.Info("garbage collecting blockstore")
startGC := time.Now()

opts = append(opts, bstore.WithCheckFreq(90*time.Second))
opts = append(opts, bstore.WithCheck(s.checkYield))
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
return err
}
Expand Down