Skip to content

Commit

Permalink
rm Do() chg and storagemgr
Browse files Browse the repository at this point in the history
  • Loading branch information
snadrus committed Feb 23, 2024
1 parent 3ef4d6a commit 4d6a0df
Show file tree
Hide file tree
Showing 12 changed files with 29 additions and 289 deletions.
9 changes: 0 additions & 9 deletions cmd/lotus-provider/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/resources/storagemgr"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
Expand Down Expand Up @@ -104,7 +103,6 @@ type Deps struct {
LocalStore *paths.Local
LocalPaths *paths.BasicLocalStorage
ListenAddr string
StorageMgr *storagemgr.StorageMgr
}

const (
Expand Down Expand Up @@ -256,13 +254,6 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
}
}

if deps.StorageMgr == nil {
lo, err := deps.LocalStore.Local(ctx)
if err != nil {
return xerrors.Errorf("could not get local storage: %w", err)
}
deps.StorageMgr = storagemgr.New(lo)
}
fmt.Println("last line of populate")

return nil
Expand Down
4 changes: 0 additions & 4 deletions cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ var runCmd = &cli.Command{
return err
}

// Get rid of all the partial-writes of any potential previous runs.
dependencies.StorageMgr.Cleanup()


taskEngine, err := tasks.StartTasks(ctx, dependencies)

if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions itests/harmonytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func withDbSetup(t *testing.T, f func(*kit.TestMiner)) {
f(miner)
}

func (t *task1) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) {
func (t *task1) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
if !stillOwned() {
return false, errors.New("Why not still owned?")
}
Expand Down Expand Up @@ -104,8 +104,8 @@ type passthru struct {
adder func(add harmonytask.AddTaskFunc)
}

func (t *passthru) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
func (t *passthru) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(taskID, stillOwned)
}
func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return t.canAccept(list, e)
Expand Down
2 changes: 1 addition & 1 deletion lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type TaskInterface interface {
// ONLY be called by harmonytask.
// Indicate if the task no-longer needs scheduling with done=true including
// cases where it's past the deadline.
Do(taskID TaskID, te *TaskEngine, stillOwned func() bool) (done bool, err error)
Do(taskID TaskID, stillOwned func() bool) (done bool, err error)

// CanAccept should return if the task can run on this machine. It should
// return null if the task type is not allowed on this machine.
Expand Down
28 changes: 16 additions & 12 deletions lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,19 @@ top:
var location string
releaseStorage := func() {
}
if c := h.TaskTypeDetails.Cost.Storage.Claim; c != nil {
if location, err = c(int(*tID)); err != nil {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)
return false
}
h.LocationMap.Store(*tID, location)
releaseStorage = func() {
if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(location); err != nil {
log.Errorw("Could not release storage", "error", err)
if h.TaskTypeDetails.Cost.Storage != nil {
if c := h.TaskTypeDetails.Cost.Storage.Claim; c != nil {
if err = c(int(*tID)); err != nil {
log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err)
return false
}
h.LocationMap.Store(*tID, location)
releaseStorage = func() {
if err := h.TaskTypeDetails.Cost.Storage.MarkComplete(); err != nil {
log.Errorw("Could not release storage", "error", err)
}
h.LocationMap.Delete(*tID)
}
h.LocationMap.Delete(*tID)
}
}

Expand Down Expand Up @@ -270,8 +272,10 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
return errors.New("Did not accept " + h.Name + " task: out of available GPU")
}

if has := h.TaskTypeDetails.Cost.Storage.HasCapacity; has != nil && !has() {
return errors.New("Did not accept " + h.Name + " task: out of available Storage")
if h.TaskTypeDetails.Cost.Storage != nil {
if has := h.TaskTypeDetails.Cost.Storage.HasCapacity; has != nil && !has() {
return errors.New("Did not accept " + h.Name + " task: out of available Storage")
}
}
return nil
}
8 changes: 4 additions & 4 deletions lib/harmony/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ type Resources struct {

// Optional Storage management.
// See storagemgr/storagemgt.go for more details.
type Storage struct {
HasCapacity func() bool
type Storage interface {
HasCapacity() bool

// This allows some other system to claim space for this task.
Claim func(taskID int) (location string, err error)
Claim(taskID int) error

// This allows some other system to consider the task done.
// It's up to the caller to remove the data, if that applies.
MarkComplete func(location string) error
MarkComplete() error
}
type Reg struct {
Resources
Expand Down
251 changes: 0 additions & 251 deletions lib/harmony/resources/storagemgr/storagemgt.go

This file was deleted.

2 changes: 1 addition & 1 deletion provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type SendTask struct {
db *harmonydb.DB
}

func (s *SendTask) Do(taskID harmonytask.TaskID, te *harmonytask.TaskEngine, stillOwned func() bool) (done bool, err error) {
func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.TODO()

// get message from db
Expand Down
Loading

0 comments on commit 4d6a0df

Please sign in to comment.