Skip to content

Commit

Permalink
Better storage fsm error handling (#484)
Browse files Browse the repository at this point in the history
* fix: better storage fsm error handling

* fix: log piece cid not root hash
  • Loading branch information
dirkmc authored Feb 5, 2021
1 parent 4daa515 commit 32f7f2e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 20 deletions.
56 changes: 43 additions & 13 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package providerstates

import (
"context"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -290,40 +289,71 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s

// HandoffDeal hands off a published deal for sealing and commitment in a sector
func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
triggerHandoffFailed := func(err error, packingErr error) error {
if packingErr == nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}
packingErr = xerrors.Errorf("packing error: %w", packingErr)
err = xerrors.Errorf("%s: %w", err, packingErr)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}

var packingInfo *storagemarket.PackingResult
var packingErr error
if deal.PiecePath != filestore.Path("") {
if deal.PiecePath != "" {
// Data for offline deals is stored on disk, so if PiecePath is set,
// create a Reader from the file path
file, err := environment.FileStore().Open(deal.PiecePath)
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))
return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored,
xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err))
}

// Hand the deal off to the process that adds it to a sector
packingInfo, err = handoffDeal(ctx.Context(), environment, deal, file, uint64(file.Size()))
if err != nil {
err = xerrors.Errorf("packing piece at path %s: %w", deal.PiecePath, err)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}
packingInfo, packingErr = handoffDeal(ctx.Context(), environment, deal, file, uint64(file.Size()))
} else {
// Create a reader to read the piece from the blockstore
pieceReader, pieceSize, err, writeErrChan := environment.GeneratePieceReader(deal.StoreID, deal.Ref.Root, shared.AllSelector())
if err != nil {
err := xerrors.Errorf("reading piece %s from store %d: %w", deal.Ref.PieceCid, deal.StoreID, err)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}

// Hand the deal off to the process that adds it to a sector
var packingErr error
packingInfo, packingErr = handoffDeal(ctx.Context(), environment, deal, pieceReader, pieceSize)

// Close the read side of the pipe
err = pieceReader.Close()
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
err = xerrors.Errorf("closing reader for piece %s from store %d: %w", deal.Ref.PieceCid, deal.StoreID, err)
return triggerHandoffFailed(err, packingErr)
}

// Wait for the write to complete
select {
case <-ctx.Context().Done():
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, errors.New("write never finished"))
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed,
xerrors.Errorf("writing piece %s never finished: %w", deal.Ref.PieceCid, ctx.Context().Err()))
case err = <-writeErrChan:
if err != nil {
err = xerrors.Errorf("writing piece %s: %w", deal.Ref.PieceCid, err)
return triggerHandoffFailed(err, packingErr)
}
}
if err != nil {

if packingErr != nil {
err = xerrors.Errorf("packing piece %s: %w", deal.Ref.PieceCid, packingErr)
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err)
}
}

if packingErr != nil {
return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, packingErr)
}

if err := recordPiece(environment, deal, packingInfo.SectorNumber, packingInfo.Offset, packingInfo.Size); err != nil {
log.Errorf("failed to register deal data for retrieval: %s", err)
err = xerrors.Errorf("failed to register deal data for piece %s for retrieval: %w", deal.Ref.PieceCid, err)
log.Error(err.Error())
_ = ctx.Trigger(storagemarket.ProviderEventPieceStoreErrored, err)
}

Expand Down
44 changes: 37 additions & 7 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to load block locations: file not found"), deal.Message)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to load block locations: file not found", deal.Ref.PieceCid), deal.Message)
},
},
"add piece block locations errors": {
Expand All @@ -668,7 +668,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State)
require.Equal(t, "recording piece for retrieval: failed to add piece block locations: could not add block locations", deal.Message)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add piece block locations: could not add block locations", deal.Ref.PieceCid), deal.Message)
},
},
"add deal for piece errors": {
Expand All @@ -685,7 +685,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAwaitingPreCommit, deal.State)
require.Equal(t, "recording piece for retrieval: failed to add deal for piece: could not add deal info", deal.Message)
require.Equal(t, fmt.Sprintf("recording piece for retrieval: failed to register deal data for piece %s for retrieval: failed to add deal for piece: could not add deal info", deal.Ref.PieceCid), deal.Message)
},
},
"opening file errors": {
Expand All @@ -710,7 +710,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: failed building sector", deal.Message)
require.Equal(t, "handing off deal to node: packing piece at path file.txt: failed building sector", deal.Message)
},
},
"assemble piece on demand fails immediately": {
Expand All @@ -722,7 +722,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: something went wrong", deal.Message)
require.Equal(t, fmt.Sprintf("handing off deal to node: reading piece %s from store %d: something went wrong", deal.Ref.PieceCid, deal.StoreID), deal.Message)
},
},
"assemble piece on demand fails async": {
Expand All @@ -734,7 +734,7 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: something went wrong", deal.Message)
require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: something went wrong", deal.Ref.PieceCid), deal.Message)
},
},
"assemble piece on demand fails closing reader": {
Expand All @@ -746,7 +746,37 @@ func TestHandoffDeal(t *testing.T) {
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, "handing off deal to node: something went wrong", deal.Message)
require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: something went wrong", deal.Ref.PieceCid, deal.StoreID), deal.Message)
},
},
"assemble piece on demand fails closing reader and OnComplete fails": {
environmentParams: environmentParams{
PieceReader: newStubbedReadCloser(errors.New("close reader failed")),
},
dealParams: dealParams{
FastRetrieval: true,
},
nodeParams: nodeParams{
OnDealCompleteError: errors.New("failed building sector"),
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, fmt.Sprintf("handing off deal to node: closing reader for piece %s from store %d: close reader failed: packing error: failed building sector", deal.Ref.PieceCid, deal.StoreID), deal.Message)
},
},
"assemble piece on demand fails async and OnComplete fails": {
environmentParams: environmentParams{
GeneratePieceReaderErrAsync: errors.New("async err"),
},
dealParams: dealParams{
FastRetrieval: true,
},
nodeParams: nodeParams{
OnDealCompleteError: errors.New("failed building sector"),
},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealFailing, deal.State)
require.Equal(t, fmt.Sprintf("handing off deal to node: writing piece %s: async err: packing error: failed building sector", deal.Ref.PieceCid), deal.Message)
},
},
}
Expand Down

0 comments on commit 32f7f2e

Please sign in to comment.