Skip to content

Commit

Permalink
Merge pull request #76 from filecoin-project/feat/jobs-api
Browse files Browse the repository at this point in the history
Add api to get active tasks
  • Loading branch information
magik6k authored Jul 22, 2020
2 parents a109ef9 + c7da20e commit 7c19c5b
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 4 deletions.
5 changes: 4 additions & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
}

m.sched.newWorkers <- &workerHandle{
w: w,
w: w,
wt: &workTracker{
running: map[uint64]storiface.WorkerJob{},
},
info: info,
preparing: &activeResources{},
active: &activeResources{},
Expand Down
7 changes: 5 additions & 2 deletions sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type workerHandle struct {
preparing *activeResources
active *activeResources

// stats / tracking
wt *workTracker

// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
Expand Down Expand Up @@ -486,7 +489,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
w.preparing.add(w.info.Resources, needRes)

go func() {
err := req.prepare(req.ctx, w.w)
err := req.prepare(req.ctx, w.wt.worker(w.w))
sh.workersLk.Lock()

if err != nil {
Expand Down Expand Up @@ -519,7 +522,7 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
case <-sh.closing:
}

err = req.work(req.ctx, w.w)
err = req.work(req.ctx, w.wt.worker(w.w))

select {
case req.ret <- workerResponse{err: err}:
Expand Down
5 changes: 4 additions & 1 deletion sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
require.NoError(t, err)

sched.newWorkers <- &workerHandle{
w: w,
w: w,
wt: &workTracker{
running: map[uint64]storiface.WorkerJob{},
},
info: info,
preparing: &activeResources{},
active: &activeResources{},
Expand Down
24 changes: 24 additions & 0 deletions sealtasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ var order = map[TaskType]int{
TTReadUnsealed: 0,
}

var shortNames = map[TaskType]string{
TTAddPiece: "AP ",

TTPreCommit1: "PC1",
TTPreCommit2: "PC2",
TTCommit1: "C1 ",
TTCommit2: "C2 ",

TTFinalize: "FIN",

TTFetch: "GET",
TTUnseal: "UNS",
TTReadUnsealed: "RD ",
}

func (a TaskType) Less(b TaskType) bool {
return order[a] < order[b]
}

func (a TaskType) Short() string {
n, ok := shortNames[a]
if !ok {
return "UNK"
}

return n
}
13 changes: 13 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,16 @@ func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {

return out
}

func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
m.sched.workersLk.Lock()
defer m.sched.workersLk.Unlock()

out := map[uint64][]storiface.WorkerJob{}

for id, handle := range m.sched.workers {
out[uint64(id)] = handle.wt.Running()
}

return out
}
15 changes: 15 additions & 0 deletions storiface/worker.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package storiface

import (
"time"

"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/specs-actors/actors/abi"
)

type WorkerInfo struct {
Hostname string

Expand All @@ -24,3 +31,11 @@ type WorkerStats struct {
GpuUsed bool
CpuUse uint64
}

type WorkerJob struct {
ID uint64
Sector abi.SectorID
Task sealtasks.TaskType

Start time.Time
}
129 changes: 129 additions & 0 deletions work_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package sectorstorage

import (
"context"
"io"
"sync"
"time"

"github.com/ipfs/go-cid"

"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
)

type workTracker struct {
lk sync.Mutex

ctr uint64
running map[uint64]storiface.WorkerJob

// TODO: done, aggregate stats, queue stats, scheduler feedback
}

func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func() {
wt.lk.Lock()
defer wt.lk.Unlock()

id := wt.ctr
wt.ctr++

wt.running[id] = storiface.WorkerJob{
ID: id,
Sector: sid,
Task: task,
Start: time.Now(),
}

return func() {
wt.lk.Lock()
defer wt.lk.Unlock()

delete(wt.running, id)
}
}

func (wt *workTracker) worker(w Worker) Worker {
return &trackedWorker{
Worker: w,
tracker: wt,
}
}

func (wt *workTracker) Running() []storiface.WorkerJob {
wt.lk.Lock()
defer wt.lk.Unlock()

out := make([]storiface.WorkerJob, 0, len(wt.running))
for _, job := range wt.running {
out = append(out, job)
}

return out
}

type trackedWorker struct {
Worker

tracker *workTracker
}

func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
defer t.tracker.track(sector, sealtasks.TTPreCommit1)()

return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)
}

func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
defer t.tracker.track(sector, sealtasks.TTPreCommit2)()

return t.Worker.SealPreCommit2(ctx, sector, pc1o)
}

func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
defer t.tracker.track(sector, sealtasks.TTCommit1)()

return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
}

func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
defer t.tracker.track(sector, sealtasks.TTCommit2)()

return t.Worker.SealCommit2(ctx, sector, c1o)
}

func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
defer t.tracker.track(sector, sealtasks.TTFinalize)()

return t.Worker.FinalizeSector(ctx, sector, keepUnsealed)
}

func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
defer t.tracker.track(sector, sealtasks.TTAddPiece)()

return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
}

func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error {
defer t.tracker.track(s, sealtasks.TTFetch)()

return t.Worker.Fetch(ctx, s, ft, ptype, am)
}

func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
defer t.tracker.track(id, sealtasks.TTUnseal)()

return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
}

func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
defer t.tracker.track(id, sealtasks.TTReadUnsealed)()

return t.Worker.ReadPiece(ctx, writer, id, index, size)
}

var _ Worker = &trackedWorker{}

0 comments on commit 7c19c5b

Please sign in to comment.