diff --git a/cmd/lotus-provider/deps/deps.go b/cmd/lotus-provider/deps/deps.go index 4d026115379..e5fb45bc1cc 100644 --- a/cmd/lotus-provider/deps/deps.go +++ b/cmd/lotus-provider/deps/deps.go @@ -31,7 +31,6 @@ import ( "github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/journal/fsjournal" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/lib/harmony/resources/storagemgr" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" @@ -104,7 +103,6 @@ type Deps struct { LocalStore *paths.Local LocalPaths *paths.BasicLocalStorage ListenAddr string - StorageMgr *storagemgr.StorageMgr } const ( @@ -256,13 +254,6 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, } } - if deps.StorageMgr == nil { - lo, err := deps.LocalStore.Local(ctx) - if err != nil { - return xerrors.Errorf("could not get local storage: %w", err) - } - deps.StorageMgr = storagemgr.New(lo) - } fmt.Println("last line of populate") return nil diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index d4ca6da159f..5135f8a0b1d 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -119,10 +119,6 @@ var runCmd = &cli.Command{ return err } - // Get rid of all the partial-writes of any potential previous runs. - dependencies.StorageMgr.Cleanup() - - taskEngine, err := tasks.StartTasks(ctx, dependencies) if err != nil { diff --git a/itests/harmonytask_test.go b/itests/harmonytask_test.go index 5fbbc929109..463f131d8fc 100644 --- a/itests/harmonytask_test.go +++ b/itests/harmonytask_test.go @@ -37,7 +37,7 @@ func withDbSetup(t *testing.T, f func(*kit.TestMiner)) { f(miner) } -func (t *task1) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { +func (t *task1) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { if !stillOwned() { return false, errors.New("Why not still owned?") } @@ -104,8 +104,8 @@ type passthru struct { adder func(add harmonytask.AddTaskFunc) } -func (t *passthru) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { - return t.do(tID, stillOwned) +func (t *passthru) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + return t.do(taskID, stillOwned) } func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { return t.canAccept(list, e) diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index c585fe9a9c2..1adc1df285a 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -49,7 +49,7 @@ type TaskInterface interface { // ONLY be called by harmonytask. // Indicate if the task no-longer needs scheduling with done=true including // cases where it's past the deadline. - Do(taskID TaskID, te *TaskEngine, stillOwned func() bool) (done bool, err error) + Do(taskID TaskID, stillOwned func() bool) (done bool, err error) // CanAccept should return if the task can run on this machine. It should // return null if the task type is not allowed on this machine. diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 471ef9b479c..ba770c0da68 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -100,17 +100,19 @@ top: var location string releaseStorage := func() { } - if c := h.TaskTypeDetails.Cost.Storage.Claim; c != nil { - if location, err = c(int(*tID)); err != nil { - log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err) - return false - } - h.LocationMap.Store(*tID, location) - releaseStorage = func() { - if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(location); err != nil { - log.Errorw("Could not release storage", "error", err) + if h.TaskTypeDetails.Cost.Storage != nil { + if c := h.TaskTypeDetails.Cost.Storage.Claim; c != nil { + if err = c(int(*tID)); err != nil { + log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err) + return false + } + h.LocationMap.Store(*tID, location) + releaseStorage = func() { + if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(); err != nil { + log.Errorw("Could not release storage", "error", err) + } + h.LocationMap.Delete(*tID) } - h.LocationMap.Delete(*tID) } } @@ -270,8 +272,10 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error { return errors.New("Did not accept " + h.Name + " task: out of available GPU") } - if has := h.TaskTypeDetails.Cost.Storage.HasCapacity; has != nil && !has() { - return errors.New("Did not accept " + h.Name + " task: out of available Storage") + if h.TaskTypeDetails.Cost.Storage != nil { + if has := h.TaskTypeDetails.Cost.Storage.HasCapacity; has != nil && !has() { + return errors.New("Did not accept " + h.Name + " task: out of available Storage") + } } return nil } diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 62b79a5dddc..2f5d09d4350 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -29,15 +29,15 @@ type Resources struct { // Optional Storage management. // See storagemgr/storagemgt.go for more details. -type Storage struct { - HasCapacity func() bool +type Storage interface { + HasCapacity() bool // This allows some other system to claim space for this task. - Claim func(taskID int) (location string, err error) + Claim(taskID int) error // This allows some other system to consider the task done. // It's up to the caller to remove the data, if that applies. - MarkComplete func(location string) error + MarkComplete() error } type Reg struct { Resources diff --git a/lib/harmony/resources/storagemgr/storagemgt.go b/lib/harmony/resources/storagemgr/storagemgt.go deleted file mode 100644 index 35cc911b5a7..00000000000 --- a/lib/harmony/resources/storagemgr/storagemgt.go +++ /dev/null @@ -1,251 +0,0 @@ -// Storagemgr is a utility for harmonytask tasks -// to manage the available space. -// Ex: -// -// TaskTypeDefinition{ -// Storage: storMgr.MakeFuncs(UsageTemporary, 1<<30, func(taskID int) string { -// return fmt.Sprintf("task-%d", taskID) // or use SQL to get the sector ID -// }), -// } -// -// Also useful: s.CombineFuncs(s1, s2, s3) -package storagemgr - -import ( - "bytes" - "encoding/json" - "os" - "path/filepath" - "strings" - "sync" - - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/lib/harmony/resources" - "github.com/filecoin-project/lotus/storage/sealer/storiface" -) - -var log = logging.Logger("harmonytask") - -type path string - -var claimsMx sync.Mutex -var claims = map[path][]consumption{} - -var purposes = map[Usage][]path{} - -type Usage int - -const ( - UsagePermanent = iota - UsageTemporary - UsageCache - UsageStaging -) - -type StorageMgr struct { // Functions are attached here to force users to call New() -} - -func New(paths []storiface.StoragePath) *StorageMgr { - // Note: this may be called multiple times per process. - // populate purposes (global) - for _, p := range paths { - // @reviewer help this logic make sense - var t Usage = UsagePermanent - if p.CanSeal { - t = UsageStaging - } - if strings.Contains(p.LocalPath, "cache") { - t = UsageCache - } - if strings.Contains(p.LocalPath, "tmp") { - t = UsageTemporary - } - purposes[t] = append(purposes[t], path(p.LocalPath)) - } - - return &StorageMgr{} -} - -// For lotus-provider (but not other deps users). -// At start-up, remove (rm -rf) all the contents of all the claims.json files. -func (s *StorageMgr) Cleanup() { - for _, paths := range purposes { - for _, path := range paths { - for _, u := range getJSON(path) { - _ = os.RemoveAll(u.Location) - } - _ = os.Remove(string(path) + "/claims.json") - } - } -} - -func (s *StorageMgr) MakeFuncs(purpose Usage, need uint64, namer func(taskID int) string) resources.Storage { - return resources.Storage{ - HasCapacity: func() bool { _, ok := s.hasCapacity(purpose, need); return ok }, - Claim: func(id int) (string, error) { - name := namer(id) - return s.claim(purpose, need, name) - }, - MarkComplete: s.markComplete, - } -} - -type consumption struct { - Location string - Need uint64 -} - -func getJSON(path path) []consumption { - b, err := os.ReadFile(string(path) + "/claims.json") - if err != nil { - log.Errorf("error reading claims: %s", err) - } else { - var consumages []consumption - err := json.NewDecoder(bytes.NewReader(b)).Decode(&consumages) - if err != nil { - log.Errorf("error decoding claims: %s", err) - } else { - return consumages - } - } - return nil -} - -func (s *StorageMgr) hasCapacity(purpose Usage, need uint64) (path, bool) { - for _, path := range purposes[purpose] { - free, err := resources.DiskFree(string(path)) - if err != nil { - log.Errorf("error checking free space: %s", err) - continue - } - if free < need { - continue - } - - claimsMx.Lock() - defer claimsMx.Unlock() - consumages := getJSON(path) - for _, c := range consumages { - free -= c.Need // presume the whole need is used - - // no point in believing the errors on hot storage - _ = filepath.Walk(c.Location, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() { - return nil - } - free += uint64(info.Size()) // give more space available if it's already used - return nil - }) - - } - if free > need { - return path, true - } - } - return "", false -} -func (s *StorageMgr) claim(purpose Usage, need uint64, name string) (location string, err error) { - path, ok := s.hasCapacity(purpose, need) - if !ok { - return "", xerrors.Errorf("no space for purpose %d", purpose) - } - - consumption := consumption{Location: filepath.Join(string(path), name), Need: need} - claimsMx.Lock() - defer claimsMx.Unlock() - - j := getJSON(path) - j = append(j, consumption) - b, err := json.Marshal(j) - if err != nil { - return "", xerrors.Errorf("error encoding claims: %w", err) - } - err = os.WriteFile(string(path)+"/claims.json", b, 0644) - if err != nil { - return "", xerrors.Errorf("error writing claims: %w", err) - } - - claims[path] = append(claims[path], consumption) - return location, nil -} - -func (s *StorageMgr) markComplete(location string) error { - claimsMx.Lock() - defer claimsMx.Unlock() - - foundPath := "" - // Clean up RAM - for path, consumages := range claims { - if strings.HasPrefix(location, string(path)) { - for i, c := range consumages { - if c.Location == location { - claims[path] = append(consumages[:i], consumages[i+1:]...) - foundPath = string(path) - break - } - } - } - } - // erase the claim on disk - if foundPath != "" { - j := getJSON(path(foundPath)) - for i, c := range j { - if c.Location == location { - j = append(j[:i], j[i+1:]...) - break - } - } - b, err := json.Marshal(j) - if err != nil { - return xerrors.Errorf("error encoding claims: %w", err) - } - err = os.WriteFile(foundPath+"/claims.json", b, 0644) - if err != nil { - return xerrors.Errorf("error writing claims: %w", err) - } - } - - return nil -} - -func (s *StorageMgr) CombineFuncs(ss ...resources.Storage) resources.Storage { - return resources.Storage{ - HasCapacity: func() bool { - for _, s := range ss { - if !s.HasCapacity() { - return false - } - } - return true - }, - Claim: func(id int) (string, error) { - locations := make([]string, 0, len(ss)) - for _, s := range ss { - location, err := s.Claim(id) - if err != nil { - for _, loc := range locations { - _ = s.MarkComplete(loc) - } - return "", err - } - locations = append(locations, location) - } - - return strings.Join(locations, ","), xerrors.Errorf("no space") - }, - MarkComplete: func(locations string) error { - var errKept error - for _, location := range strings.Split(locations, ",") { - if err := s.markComplete(location); err != nil { - errKept = err - } - } - return errKept - }, - } -} diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 02363749d54..97f889ecc2f 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -59,7 +59,7 @@ type SendTask struct { db *harmonydb.DB } -func (s *SendTask) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { +func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.TODO() // get message from db diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index c2517c39a4b..65102d0f014 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -111,7 +111,7 @@ func NewWdPostTask(db *harmonydb.DB, return t, nil } -func (t *WdPostTask) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { +func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { log.Debugw("WdPostTask.Do()", "taskID", taskID) var spID, pps, dlIdx, partIdx uint64 diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index 0724eb38f25..27c501dde17 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -86,7 +86,7 @@ func NewWdPostRecoverDeclareTask(sender *lpmessage.Sender, return t, nil } -func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { +func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { log.Debugw("WdPostRecoverDeclareTask.Do()", "taskID", taskID) ctx := context.Background() diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index bb52ff2aa99..8d39c40abeb 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -71,7 +71,7 @@ func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sen return res, nil } -func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { +func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { log.Debugw("WdPostSubmitTask.Do", "taskID", taskID) var spID uint64 diff --git a/provider/lpwinning/winning_task.go b/provider/lpwinning/winning_task.go index bf0c786bda8..b795ade4f73 100644 --- a/provider/lpwinning/winning_task.go +++ b/provider/lpwinning/winning_task.go @@ -86,7 +86,7 @@ func NewWinPostTask(max int, db *harmonydb.DB, prover ProverWinningPoSt, verifie return t } -func (t *WinPostTask) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) { +func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { log.Debugw("WinPostTask.Do()", "taskID", taskID) ctx := context.TODO()