Skip to content

Commit

Permalink
Merge pull request #8149 from filecoin-project/fix/reify-limit
Browse files Browse the repository at this point in the history
fix: limit reification sizes
  • Loading branch information
magik6k authored Feb 20, 2022
2 parents 31ad8e6 + a7b1d86 commit 051ff5d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 6 deletions.
21 changes: 16 additions & 5 deletions blockstore/splitstore/splitstore_reify.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package splitstore

import (
"errors"
"runtime"
"sync/atomic"

Expand All @@ -10,13 +11,12 @@ import (
cid "github.com/ipfs/go-cid"
)

var EnableReification = false
var (
errReifyLimit = errors.New("reification limit reached")
ReifyLimit = 16384
)

func (s *SplitStore) reifyColdObject(c cid.Cid) {
if !EnableReification {
return
}

if !s.isWarm() {
return
}
Expand Down Expand Up @@ -104,12 +104,18 @@ func (s *SplitStore) doReify(c cid.Cid) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()

count := 0
err := s.walkObjectIncomplete(c, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

count++
if count > ReifyLimit {
return errReifyLimit
}

s.reifyMx.Lock()
_, inProgress := s.reifyInProgress[c]
if !inProgress {
Expand Down Expand Up @@ -150,6 +156,11 @@ func (s *SplitStore) doReify(c cid.Cid) {
})

if err != nil {
if xerrors.Is(err, errReifyLimit) {
log.Debug("reification aborted; reify limit reached")
return
}

log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
return
}
Expand Down
101 changes: 100 additions & 1 deletion blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,102 @@ func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.
}
}

func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()

mkRandomBlock := func() blocks.Block {
data := make([]byte, 128)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}

return blocks.NewBlock(data)
}

block1 := mkRandomBlock()
block2 := mkRandomBlock()
block3 := mkRandomBlock()

hdr := mock.MkBlock(nil, 0, 0)
hdr.Messages = block1.Cid()
hdr.ParentMessageReceipts = block2.Cid()
hdr.ParentStateRoot = block3.Cid()
block4, err := hdr.ToStorageBlock()
if err != nil {
t.Fatal(err)
}

allBlocks := []blocks.Block{block1, block2, block3, block4}
for _, blk := range allBlocks {
err := cold.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
}

path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}

t.Cleanup(func() {
_ = os.RemoveAll(path)
})

ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint

ss.warmupEpoch = 1
go ss.reifyOrchestrator()

waitForReification := func() {
for {
ss.reifyMx.Lock()
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
ss.reifyMx.Unlock()

if ready {
return
}

time.Sleep(time.Millisecond)
}
}

// do a hot access -- nothing should be reified as the limit should be exceeded
oldReifyLimit := ReifyLimit
ReifyLimit = 2
t.Cleanup(func() {
ReifyLimit = oldReifyLimit
})

err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}

waitForReification()

for _, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}

if has {
t.Fatal("block unexpectedly reified")
}
}

}

func TestSplitStoreReification(t *testing.T) {
EnableReification = true
t.Log("test reification with Has")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Has(ctx, c)
Expand All @@ -516,6 +610,11 @@ func TestSplitStoreReification(t *testing.T) {
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
return s.View(ctx, c, func(_ []byte) error { return nil })
})
t.Log("test reification limit")
testSplitStoreReificationLimit(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Has(ctx, c)
return err
})
}

type mockChain struct {
Expand Down
8 changes: 8 additions & 0 deletions blockstore/splitstore/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type tmpVisitor struct {
var _ ObjectVisitor = (*tmpVisitor)(nil)

func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) {
if isUnitaryObject(c) {
return false, nil
}

return v.set.Visit(c), nil
}

Expand All @@ -45,6 +49,10 @@ func newConcurrentVisitor() *concurrentVisitor {
}

func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) {
if isUnitaryObject(c) {
return false, nil
}

v.mx.Lock()
defer v.mx.Unlock()

Expand Down

0 comments on commit 051ff5d

Please sign in to comment.