diff --git a/api/version.go b/api/version.go index 9f4f7351361..cc0c7b270c1 100644 --- a/api/version.go +++ b/api/version.go @@ -57,8 +57,8 @@ var ( FullAPIVersion0 = newVer(1, 5, 0) FullAPIVersion1 = newVer(2, 2, 0) - MinerAPIVersion0 = newVer(1, 4, 0) - WorkerAPIVersion0 = newVer(1, 5, 0) + MinerAPIVersion0 = newVer(1, 5, 0) + WorkerAPIVersion0 = newVer(1, 6, 0) ) //nolint:varcheck,deadcode diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 28e0715591c..595f6419ee7 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -589,7 +590,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect return xerrors.Errorf("acquiring sector lock: %w", err) } - fts := storiface.FTUnsealed + moveUnsealed := storiface.FTUnsealed { unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) if err != nil { @@ -597,7 +598,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine - fts = storiface.FTNone + moveUnsealed = storiface.FTNone } } @@ -616,10 +617,10 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } } - selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false) + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed)) return err @@ -628,22 +629,30 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect return err } - fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage) - moveUnsealed := fts - { - if len(keepUnsealed) == 0 { - moveUnsealed = storiface.FTNone + move := func(types storiface.SectorFileType) error { + fetchSel := newAllocSelector(m.index, types, storiface.PathStorage) + { + if len(keepUnsealed) == 0 { + moveUnsealed = storiface.FTNone + } + } + + err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, + m.schedFetch(sector, types, storiface.PathStorage, storiface.AcquireMove), + func(ctx context.Context, w Worker) error { + _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, types)) + return err + }) + if err != nil { + return xerrors.Errorf("moving sector to storage: %w", err) } + return nil } - err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), - func(ctx context.Context, w Worker) error { - _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed)) - return err - }) - if err != nil { - return xerrors.Errorf("moving sector to storage: %w", err) + err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache)) + err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called + if moveUnsealed != storiface.FTNone { + err = multierr.Append(err, move(moveUnsealed)) } return nil diff --git a/extern/sector-storage/stores/http_handler.go b/extern/sector-storage/stores/http_handler.go index 80fa8740834..6b7d249040f 100644 --- a/extern/sector-storage/stores/http_handler.go +++ b/extern/sector-storage/stores/http_handler.go @@ -172,7 +172,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R return } - if err := handler.Local.Remove(r.Context(), id, ft, false, []ID{ID(r.FormValue("keep"))}); err != nil { + if err := handler.Local.Remove(r.Context(), id, ft, false, ParseIDList(r.FormValue("keep"))); err != nil { log.Errorf("%+v", err) w.WriteHeader(500) return diff --git a/extern/sector-storage/stores/index.go b/extern/sector-storage/stores/index.go index a90cdf0b969..35a1da693e9 100644 --- a/extern/sector-storage/stores/index.go +++ b/extern/sector-storage/stores/index.go @@ -7,6 +7,7 @@ import ( "net/url" gopath "path" "sort" + "strings" "sync" "time" @@ -29,6 +30,27 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5 // filesystem, local or networked / shared by multiple machines type ID string +const IDSep = "." + +type IDList []ID + +func (il IDList) String() string { + l := make([]string, len(il)) + for i, id := range il { + l[i] = string(id) + } + return strings.Join(l, IDSep) +} + +func ParseIDList(s string) IDList { + strs := strings.Split(s, IDSep) + out := make([]ID, len(strs)) + for i, str := range strs { + out[i] = ID(str) + } + return out +} + type Group = string type StorageInfo struct { diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index bd6b34be334..42a41f78823 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -44,12 +44,36 @@ type Remote struct { pfHandler PartialFileHandler } -func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error { - // TODO: do this on remotes too - // (not that we really need to do that since it's always called by the - // worker which pulled the copy) +func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, typ storiface.SectorFileType) error { + if bits.OnesCount(uint(typ)) != 1 { + return xerrors.New("RemoveCopies expects one file type") + } + + if err := r.local.RemoveCopies(ctx, s, typ); err != nil { + return xerrors.Errorf("removing local copies: %w", err) + } + + si, err := r.index.StorageFindSector(ctx, s, typ, 0, false) + if err != nil { + return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", s, typ, err) + } + + var hasPrimary bool + var keep []ID + for _, info := range si { + if info.Primary { + hasPrimary = true + keep = append(keep, info.ID) + break + } + } + + if !hasPrimary { + log.Warnf("remote RemoveCopies: no primary copies of sector %v (%s), not removing anything", s, typ) + return nil + } - return r.local.RemoveCopies(ctx, s, types) + return r.Remove(ctx, s, typ, true, keep) } func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler PartialFileHandler) *Remote { @@ -156,7 +180,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin if op == storiface.AcquireMove { id := ID(storageID) - if err := r.deleteFromRemote(ctx, url, &id); err != nil { + if err := r.deleteFromRemote(ctx, url, []ID{id}); err != nil { log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) } } @@ -355,7 +379,7 @@ storeLoop: } } for _, url := range info.URLs { - if err := r.deleteFromRemote(ctx, url, nil); err != nil { + if err := r.deleteFromRemote(ctx, url, keepIn); err != nil { log.Warnf("remove %s: %+v", url, err) continue } @@ -366,9 +390,9 @@ storeLoop: return nil } -func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn *ID) error { +func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn IDList) error { if keepIn != nil { - url = url + "?keep=" + string(*keepIn) + url = url + "?keep=" + keepIn.String() } log.Infof("Delete %s", url) diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 2ca86f5465a..4f7ae767dfe 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -516,7 +516,20 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error { func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) { return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { - return nil, l.storage.MoveStorage(ctx, sector, types) + if err := l.storage.MoveStorage(ctx, sector, types); err != nil { + return nil, xerrors.Errorf("move to storage: %w", err) + } + + for _, fileType := range storiface.PathTypes { + if fileType&types == 0 { + continue + } + + if err := l.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil { + return nil, xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err) + } + } + return nil, nil }) }