Skip to content

Commit

Permalink
fix: close the reader after unsealing into blockstore (#507)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Mar 22, 2021
1 parent d5b1990 commit 0f8f468
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
43 changes: 32 additions & 11 deletions retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,25 +113,46 @@ func (pde *providerDealEnvironment) Node() retrievalmarket.RetrievalProviderNode
return pde.p.node
}

func (pde *providerDealEnvironment) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error {
store, err := pde.p.multiStore.Get(storeID)
if err != nil {
return err
func (pde *providerDealEnvironment) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.ReadCloser) error {
// Get the the destination multistore
store, loadErr := pde.p.multiStore.Get(storeID)
if loadErr != nil {
return xerrors.Errorf("failed to read file into blockstore: failed to get multistore %d: %w", storeID, loadErr)
}
_, err = cario.NewCarIO().LoadCar(store.Bstore, pieceData)

// drain the reader first
_, derr := io.Copy(ioutil.Discard, pieceData)
// Load the CAR into the blockstore
_, loadErr = cario.NewCarIO().LoadCar(store.Bstore, pieceData)
if loadErr != nil {
// Just log the error, so we can drain and close the reader before
// returning the error
loadErr = xerrors.Errorf("failed to load car file into blockstore: %w", loadErr)
log.Error(loadErr.Error())
}

if err != nil {
// Attempt to drain and close the reader before returning any error
_, drainErr := io.Copy(ioutil.Discard, pieceData)
closeErr := pieceData.Close()

// If there was an error loading the CAR file into the blockstore, throw that error
if loadErr != nil {
return loadErr
}

// If there was an error draining the reader, throw that error
if drainErr != nil {
err := xerrors.Errorf("failed to read file into blockstore: failed to drain piece reader: %w", drainErr)
log.Error(err.Error())
return err
}

if derr != nil {
return xerrors.Errorf("draining piece reader: %w", derr)
// If there was an error closing the reader, throw that error
if closeErr != nil {
err := xerrors.Errorf("failed to read file into blockstore: failed to close reader: %w", closeErr)
log.Error(err.Error())
return err
}

return err
return nil
}

func (pde *providerDealEnvironment) TrackTransfer(deal retrievalmarket.ProviderDealState) error {
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (
type ProviderDealEnvironment interface {
// Node returns the node interface for this deal
Node() rm.RetrievalProviderNode
ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error
ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.ReadCloser) error
TrackTransfer(deal rm.ProviderDealState) error
UntrackTransfer(deal rm.ProviderDealState) error
DeleteStore(storeID multistore.StoreID) error
ResumeDataTransfer(context.Context, datatransfer.ChannelID) error
CloseDataTransfer(context.Context, datatransfer.ChannelID) error
}

func firstSuccessfulUnseal(ctx context.Context, node rm.RetrievalProviderNode, pieceInfo piecestore.PieceInfo) (io.Reader, error) {
func firstSuccessfulUnseal(ctx context.Context, node rm.RetrievalProviderNode, pieceInfo piecestore.PieceInfo) (io.ReadCloser, error) {
lastErr := xerrors.New("no sectors found to unseal from")
for _, deal := range pieceInfo.Deals {
reader, err := node.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
Expand Down
2 changes: 1 addition & 1 deletion retrievalmarket/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (te *mockProviderEnv) DeleteStore(storeID multistore.StoreID) error {
return nil
}

func (te *mockProviderEnv) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error {
func (te *mockProviderEnv) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.ReadCloser) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion retrievalmarket/testing/test_provider_deal_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (te *TestProviderDealEnvironment) DeleteStore(storeID multistore.StoreID) e
return te.DeleteStoreError
}

func (te *TestProviderDealEnvironment) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.Reader) error {
func (te *TestProviderDealEnvironment) ReadIntoBlockstore(storeID multistore.StoreID, pieceData io.ReadCloser) error {
return te.ReadIntoBlockstoreError
}

Expand Down

0 comments on commit 0f8f468

Please sign in to comment.