Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

update state machine #45

Merged
merged 1 commit into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions api/impl/strageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,30 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add
return mi.SectorSize, nil
}

func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error {
return sm.Miner.PledgeSector()
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) {
sr, err := sm.Miner.PledgeSector(ctx)
if err != nil {
return abi.SectorID{}, err
}

// wait for the sector to enter the Packing state
// TODO: instead of polling implement some pubsub-type thing in storagefsm
for {
info, err := sm.Miner.GetSectorInfo(sr.ID.Number)
if err != nil {
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
}

if info.State != types2.UndefinedSectorState {
return sr.ID, nil
}

select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
return abi.SectorID{}, ctx.Err()
}
}
}

func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
Expand Down
6 changes: 3 additions & 3 deletions api/storage_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type StorageMiner interface {
ActorAddressConfig(ctx context.Context) (AddressConfig, error)

// Temp api for testing
PledgeSector(context.Context) error
PledgeSector(context.Context) (abi.SectorID, error)

// Get the status of a given sector by ID
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error)
Expand Down Expand Up @@ -146,7 +146,7 @@ type StorageMinerStruct struct {
ActorAddressConfig func(ctx context.Context) (AddressConfig, error) `perm:"read"`
NetParamsConfig func(ctx context.Context) (*config.NetParamsConfig, error) `perm:"read"`

PledgeSector func(context.Context) error `perm:"write"`
PledgeSector func(context.Context) (abi.SectorID, error) `perm:"write"`

SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
Expand Down Expand Up @@ -248,7 +248,7 @@ func (c *StorageMinerStruct) ActorAddressConfig(ctx context.Context) (AddressCon
return c.Internal.ActorAddressConfig(ctx)
}

func (c *StorageMinerStruct) PledgeSector(ctx context.Context) error {
func (c *StorageMinerStruct) PledgeSector(ctx context.Context) (abi.SectorID, error) {
return c.Internal.PledgeSector(ctx)
}

Expand Down
9 changes: 8 additions & 1 deletion app/venus-sealer/sectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ var sectorsPledgeCmd = &cli.Command{
defer closer()
ctx := api.ReqContext(cctx)

return nodeApi.PledgeSector(ctx)
id, err := nodeApi.PledgeSector(ctx)
if err != nil {
return err
}

fmt.Println("Created CC sector: ", id.Number)

return nil
},
}

Expand Down
7 changes: 7 additions & 0 deletions config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ type SealingConfig struct {
MaxSealingSectorsForDeals uint64

WaitDealsDelay Duration

AlwaysKeepUnsealedCopy bool

// Keep this many sectors in sealing pipeline, start CC if needed
// todo TargetSealingSectors uint64

// todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above
}

type MinerFeeConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb
github.com/filecoin-project/specs-actors/v3 v3.1.0
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506
github.com/filecoin-project/venus v0.9.5
github.com/filecoin-project/venus v0.9.6
github.com/gbrlsnchs/jwt/v3 v3.0.0
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.2.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ github.com/filecoin-project/venus v0.9.2-0.20210331052540-ca4ea7bab5fa h1:fg0Lxc
github.com/filecoin-project/venus v0.9.2-0.20210331052540-ca4ea7bab5fa/go.mod h1:9IY4iNo5At3GMBUNa8we3aGL7zM3cFUNxy2rMpb86C0=
github.com/filecoin-project/venus v0.9.5 h1:oZvfBpoE1ryJ74ooHZEqAGPYfCCznQnc5wzMs1Eytsk=
github.com/filecoin-project/venus v0.9.5/go.mod h1:ol4DC0omgbpszB6UNJB92zT0QfAkux974U7aiW89XvI=
github.com/filecoin-project/venus v0.9.6 h1:ZeY6YdJ9XHgXH5+He5S3DWwYLsvudzdsOumPevVDV5E=
github.com/filecoin-project/venus v0.9.6/go.mod h1:qy68zWNOYvEwkvdwdRQmOFyePkkOX5ODkMg47x+WLlo=
github.com/filecoin-project/venus-auth v1.0.2-0.20210507023017-76ce8b64e6db/go.mod h1:LXrT3H5dbcG/N+Ze5jMLGwAyGic9s1y3J/JV2/qM0sU=
github.com/filecoin-project/venus-wallet v1.0.1-0.20210507023531-5dfabaf5606d h1:Y7wofigDdz0Sa+GlBmSjvXDrRtMRcV1CUHVEzXJm7J8=
github.com/filecoin-project/venus-wallet v1.0.1-0.20210507023531-5dfabaf5606d/go.mod h1:COcAOGRgROgZ3doZ6v05SbB+IQ4fDR1z/9+XD6i5+IA=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
Expand Down Expand Up @@ -765,6 +770,7 @@ github.com/jinzhu/now v1.1.1 h1:g39TucaRWyV3dwDO++eEc6qf8TVIQ/Da48WmqjZ3i7E=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down Expand Up @@ -1148,6 +1154,7 @@ github.com/mattn/go-runewidth v0.0.8 h1:3tS41NlGYSmhhe/8fhGRzc+z3AYCw1Fe1WAyLuuj
github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-xmlrpc v0.0.3/go.mod h1:mqc2dz7tP5x5BKlCahN/n+hs7OSZKJkS9JsHNBRlrxA=
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
Expand Down
2 changes: 2 additions & 0 deletions modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func NewSetSealConfigFunc(r *config.StorageMiner) (types2.SetSealingConfigFunc,
MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
}
})
return
Expand All @@ -299,6 +300,7 @@ func NewGetSealConfigFunc(r *config.StorageMiner) (types2.GetSealingConfigFunc,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
}
})
return
Expand Down
3 changes: 2 additions & 1 deletion sector-storage/fr32/readers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestUnpadReader(t *testing.T) {
t.Fatal(err)
}

readered, err := ioutil.ReadAll(r)
// using bufio reader to make sure reads are big enough for the padreader - it can't handle small reads right now
readered, err := ioutil.ReadAll(bufio.NewReaderSize(r, 512))
if err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 8 additions & 3 deletions sector-storage/fsutil/statfs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package fsutil

type FsStat struct {
Capacity int64
Available int64 // Available to use for sector storage
Reserved int64
Capacity int64
Available int64 // Available to use for sector storage
FSAvailable int64 // Available in the filesystem
Reserved int64

// non-zero when storage has configured MaxStorage
Max int64
Used int64
}
6 changes: 4 additions & 2 deletions sector-storage/fsutil/statfs_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ func Statfs(path string) (FsStat, error) {
// force int64 to handle platform specific differences
//nolint:unconvert
return FsStat{
Capacity: int64(stat.Blocks) * int64(stat.Bsize),
Available: int64(stat.Bavail) * int64(stat.Bsize),
Capacity: int64(stat.Blocks) * int64(stat.Bsize),

Available: int64(stat.Bavail) * int64(stat.Bsize),
FSAvailable: int64(stat.Bavail) * int64(stat.Bsize),
}, nil
}
5 changes: 3 additions & 2 deletions sector-storage/fsutil/statfs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func Statfs(volumePath string) (FsStat, error) {
uintptr(unsafe.Pointer(&availBytes)))

return FsStat{
Capacity: totalBytes,
Available: availBytes,
Capacity: totalBytes,
Available: availBytes,
FSAvailable: availBytes,
}, nil
}
22 changes: 11 additions & 11 deletions sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,47 +632,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
}

func (m *Manager) ReturnAddPiece(ctx context.Context, callID types.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(callID, pi, err)
return m.returnResult(ctx, callID, pi, err)
}

func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID types.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return m.returnResult(callID, p1o, err)
return m.returnResult(ctx, callID, p1o, err)
}

func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID types.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return m.returnResult(callID, sealed, err)
return m.returnResult(ctx, callID, sealed, err)
}

func (m *Manager) ReturnSealCommit1(ctx context.Context, callID types.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return m.returnResult(callID, out, err)
return m.returnResult(ctx, callID, out, err)
}

func (m *Manager) ReturnSealCommit2(ctx context.Context, callID types.CallID, proof storage.Proof, err *storiface.CallError) error {
return m.returnResult(callID, proof, err)
return m.returnResult(ctx, callID, proof, err)
}

func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID types.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID types.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID types.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID types.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnReadPiece(ctx context.Context, callID types.CallID, ok bool, err *storiface.CallError) error {
return m.returnResult(callID, ok, err)
return m.returnResult(ctx, callID, ok, err)
}

func (m *Manager) ReturnFetch(ctx context.Context, callID types.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
Expand Down
60 changes: 56 additions & 4 deletions sector-storage/manager_calltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,67 @@ package sectorstorage

import (
"context"
"github.com/filecoin-project/venus-sealer/types"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/venus-sealer/sector-storage/storiface"
"github.com/filecoin-project/venus-sealer/types"
)

type WorkID struct {
Method types.TaskType
Params string // json [...params]
}

func (w WorkID) String() string {
return fmt.Sprintf("%s(%s)", w.Method, w.Params)
}

var _ fmt.Stringer = &WorkID{}

type WorkStatus string

const (
wsStarted WorkStatus = "started" // task started, not scheduled/running on a worker yet
wsRunning WorkStatus = "running" // task running on a worker, waiting for worker return
wsDone WorkStatus = "done" // task returned from the worker, results available
)

type WorkState struct {
ID WorkID

Status WorkStatus

WorkerCall storiface.CallID // Set when entering wsRunning
WorkError string // Status = wsDone, set when failed to start work

WorkerHostname string // hostname of last worker handling this job
StartTime int64 // unix seconds
}

func newWorkID(method types.TaskType, params ...interface{}) (WorkID, error) {
pb, err := json.Marshal(params)
if err != nil {
return WorkID{}, xerrors.Errorf("marshaling work params: %w", err)
}

if len(pb) > 256 {
s := sha256.Sum256(pb)
pb = []byte(hex.EncodeToString(s[:]))
}

return WorkID{
Method: method,
Params: string(pb),
}, nil
}

func (m *Manager) setupWorkTracker() {
m.workLk.Lock()
defer m.workLk.Unlock()
Expand Down Expand Up @@ -297,15 +349,15 @@ func (m *Manager) waitCall(ctx context.Context, callID types.CallID) (interface{
}
}

func (m *Manager) returnResult(callID types.CallID, r interface{}, cerr *storiface.CallError) error {
func (m *Manager) returnResult(ctx context.Context, callID types.CallID, r interface{}, cerr *storiface.CallError) error {
res := result{
r: r,
}
if cerr != nil {
res.err = cerr
}

m.sched.workTracker.onDone(callID)
m.sched.workTracker.onDone(ctx, callID)

m.workLk.Lock()
defer m.workLk.Unlock()
Expand Down Expand Up @@ -361,5 +413,5 @@ func (m *Manager) returnResult(callID types.CallID, r interface{}, cerr *storifa

func (m *Manager) Abort(ctx context.Context, call types.CallID) error {
// TODO: Allow temp error
return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
return m.returnResult(ctx, call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
}
19 changes: 19 additions & 0 deletions sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var log = logging.Logger("sbmock")

type SectorMgr struct {
sectors map[abi.SectorID]*sectorState
failPoSt bool
pieces map[cid.Cid][]byte
nextSectorID abi.SectorNumber

Expand Down Expand Up @@ -264,6 +265,14 @@ func (mgr *SectorMgr) MarkFailed(sid storage.SectorRef, failed bool) error {
return nil
}

func (mgr *SectorMgr) Fail() {
mgr.lk.Lock()
defer mgr.lk.Unlock()
mgr.failPoSt = true

return
}

func (mgr *SectorMgr) MarkCorrupted(sid storage.SectorRef, corrupted bool) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
Expand Down Expand Up @@ -293,10 +302,20 @@ func AddOpFinish(ctx context.Context) (context.Context, func()) {
}

func (mgr *SectorMgr) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) ([]proof2.PoStProof, error) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

return generateFakePoSt(sectorInfo, abi.RegisteredSealProof.RegisteredWinningPoStProof, randomness), nil
}

func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) ([]proof2.PoStProof, []abi.SectorID, error) {
mgr.lk.Lock()
defer mgr.lk.Unlock()

if mgr.failPoSt {
return nil, nil, xerrors.Errorf("failed to post (mock)")
}

si := make([]proof2.SectorInfo, 0, len(sectorInfo))
var skipped []abi.SectorID

Expand Down
Loading