Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redeclare storage when unsealed state is checked #1377

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions indexprovider/unsealedstatemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/filecoin-project/boost-gfm/storagemarket"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
logging "github.com/ipfs/go-log/v2"
provider "github.com/ipni/index-provider"
"github.com/ipni/index-provider/metadata"
"time"
)

//go:generate go run github.com/golang/mock/mockgen -destination=./mock/mock.go -package=mock github.com/filecoin-project/boost-gfm/storagemarket StorageProvider
Expand All @@ -22,6 +24,7 @@ var usmlog = logging.Logger("unsmgr")

type ApiStorageMiner interface {
StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error)
StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error
}

type UnsealedStateManager struct {
Expand All @@ -30,21 +33,24 @@ type UnsealedStateManager struct {
dealsDB *db.DealsDB
sdb *db.SectorStateDB
api ApiStorageMiner
cfg config.StorageConfig
}

func NewUnsealedStateManager(idxprov *Wrapper, legacyProv storagemarket.StorageProvider, dealsDB *db.DealsDB, sdb *db.SectorStateDB, api ApiStorageMiner) *UnsealedStateManager {
func NewUnsealedStateManager(idxprov *Wrapper, legacyProv storagemarket.StorageProvider, dealsDB *db.DealsDB, sdb *db.SectorStateDB, api ApiStorageMiner, cfg config.StorageConfig) *UnsealedStateManager {
return &UnsealedStateManager{
idxprov: idxprov,
legacyProv: legacyProv,
dealsDB: dealsDB,
sdb: sdb,
api: api,
cfg: cfg,
}
}

func (m *UnsealedStateManager) Run(ctx context.Context) {
usmlog.Info("starting unsealed state manager")
ticker := time.NewTicker(time.Hour)
duration := time.Duration(m.cfg.StorageListRefreshDuration)
usmlog.Infof("starting unsealed state manager running on interval %s", duration.String())
ticker := time.NewTicker(duration)
defer ticker.Stop()

// Check immediately
Expand All @@ -69,6 +75,16 @@ func (m *UnsealedStateManager) Run(ctx context.Context) {

func (m *UnsealedStateManager) checkForUpdates(ctx context.Context) error {
usmlog.Info("checking for sector state updates")

// Tell lotus to update it's storage list and remove any removed sectors
if m.cfg.RedeclareOnStorageListRefresh {
usmlog.Info("redeclaring storage")
err := m.api.StorageRedeclareLocal(ctx, nil, true)
if err != nil {
log.Errorf("redeclaring local storage on lotus miner: %w", err)
}
}

stateUpdates, err := m.getStateUpdates(ctx)
if err != nil {
return err
Expand All @@ -79,13 +95,16 @@ func (m *UnsealedStateManager) checkForUpdates(ctx context.Context) error {
return fmt.Errorf("getting legacy deals from datastore: %w", err)
}

usmlog.Debugf("checking for sector state updates for %d states", len(stateUpdates))

// For each sector
for sectorID, sectorSealState := range stateUpdates {
// Get the deals in the sector
deals, err := m.dealsBySectorID(ctx, legacyDeals, sectorID)
if err != nil {
return fmt.Errorf("getting deals for miner %d / sector %d: %w", sectorID.Miner, sectorID.Number, err)
}
usmlog.Debugf("sector %d has %d deals, seal status %s", sectorID, len(deals), sectorSealState)

// For each deal in the sector
for _, deal := range deals {
Expand Down
12 changes: 10 additions & 2 deletions indexprovider/unsealedstatemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package indexprovider

import (
"context"
"testing"

"github.com/filecoin-project/boost-gfm/storagemarket"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/db/migrations"
"github.com/filecoin-project/boost/indexprovider/mock"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
Expand All @@ -15,7 +18,6 @@ import (
"github.com/ipni/index-provider/metadata"
mock_provider "github.com/ipni/index-provider/mock"
"github.com/stretchr/testify/require"
"testing"
)

// Empty response from MinerAPI.StorageList()
Expand Down Expand Up @@ -312,7 +314,9 @@ func setup(t *testing.T) (*UnsealedStateManager, *mock.MockStorageProvider, *moc
prov: prov,
meshCreator: &meshCreatorStub{},
}
usm := NewUnsealedStateManager(wrapper, storageProvider, dealsDB, sectorStateDB, storageMiner)

cfg := config.StorageConfig{}
usm := NewUnsealedStateManager(wrapper, storageProvider, dealsDB, sectorStateDB, storageMiner, cfg)
return usm, storageProvider, storageMiner, prov
}

Expand All @@ -326,6 +330,10 @@ func (m mockApiStorageMiner) StorageList(ctx context.Context) (map[storiface.ID]
return m.storageList, nil
}

func (m mockApiStorageMiner) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error {
return nil
}

type meshCreatorStub struct {
}

Expand Down
2 changes: 1 addition & 1 deletion indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.Loc
OnStart: func(ctx context.Context) error {
// Watch for changes in sector unseal state and update the
// indexer when there are changes
usm := NewUnsealedStateManager(w, legacyProv, dealsDB, ssDB, storageService)
usm := NewUnsealedStateManager(w, legacyProv, dealsDB, ssDB, storageService, w.cfg.Storage)
go usm.Run(runCtx)

// Announce all deals on startup in case of a config change
Expand Down
4 changes: 3 additions & 1 deletion node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func DefaultBoost() *Boost {
Common: defCommon(),

Storage: StorageConfig{
ParallelFetchLimit: 10,
ParallelFetchLimit: 10,
StorageListRefreshDuration: Duration(time.Hour * 12),
RedeclareOnStorageListRefresh: true,
},

Graphql: GraphqlConfig{
Expand Down
16 changes: 16 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,12 @@ func (c *FeeConfig) Legacy() lotus_config.MinerFeeConfig {
type StorageConfig struct {
// The maximum number of concurrent fetch operations to the storage subsystem
ParallelFetchLimit int
// How frequently Boost should refresh the state of sectors with Lotus. (default: 12hours)
// When run, Boost will trigger a storage redeclare on the miner in addition to a storage list.
// This ensures that index metadata for sectors reflects their status (removed, unsealed, etc).
StorageListRefreshDuration Duration
// Whether or not Boost should have lotus redeclare its storage list (default: true).
// Disable this if you wish to manually handle the refresh. If manually managing the redeclare
// and it is not triggered, retrieval quality for users will be impacted.
RedeclareOnStorageListRefresh bool
}