Skip to content

Commit

Permalink
add SeekStart method to inflator reader (#656)
Browse files Browse the repository at this point in the history
* feat: add SeekStart method to inflator reader, so we can seek back to the start when retrying AddPiece

* fix: storage provider restart in publish stage (#657)
  • Loading branch information
dirkmc authored Dec 21, 2021
1 parent 1c99269 commit bd2751f
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 52 deletions.
45 changes: 45 additions & 0 deletions shared/commp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package shared

import (
"io"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
)

func GenerateCommp(reader io.Reader, payloadSize uint64, targetSize uint64) (cid.Cid, error) {
// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
w := &writer.Writer{}
written, err := io.Copy(w, reader)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to write to CommP writer: %w", err)
}
if written != int64(payloadSize) {
return cid.Undef, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, payloadSize)
}

cidAndSize, err := w.Sum()
if err != nil {
return cid.Undef, xerrors.Errorf("failed to get CommP: %w", err)
}

if uint64(cidAndSize.PieceSize) < targetSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:],
uint64(cidAndSize.PieceSize),
uint64(targetSize),
)
if err != nil {
return cid.Undef, err
}
cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return cidAndSize.PieceCID, err
}
71 changes: 71 additions & 0 deletions shared/inflator_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package shared

import (
"io"
"sync"

"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
)

// ReadSeekStarter implements io.Reader and allows the caller to seek to
// the start of the reader
type ReadSeekStarter interface {
io.Reader
SeekStart() error
}

// inflatorReader wraps the MultiReader returned by padreader so that we can
// add a SeekStart method. It's used for example when there is an error
// reading from the reader and we need to return to the start.
type inflatorReader struct {
readSeeker io.ReadSeeker
payloadSize uint64
targetSize abi.UnpaddedPieceSize

lk sync.RWMutex
paddedReader io.Reader
}

var _ ReadSeekStarter = (*inflatorReader)(nil)

func NewInflatorReader(readSeeker io.ReadSeeker, payloadSize uint64, targetSize abi.UnpaddedPieceSize) (*inflatorReader, error) {
paddedReader, err := padreader.NewInflator(readSeeker, payloadSize, targetSize)
if err != nil {
return nil, err
}

return &inflatorReader{
readSeeker: readSeeker,
paddedReader: paddedReader,
payloadSize: payloadSize,
targetSize: targetSize,
}, nil
}

func (r *inflatorReader) Read(p []byte) (n int, err error) {
r.lk.RLock()
defer r.lk.RUnlock()

return r.paddedReader.Read(p)
}

func (r *inflatorReader) SeekStart() error {
r.lk.Lock()
defer r.lk.Unlock()

// Seek to the start of the underlying reader
_, err := r.readSeeker.Seek(0, io.SeekStart)
if err != nil {
return err
}

// Recreate the padded reader
paddedReader, err := padreader.NewInflator(r.readSeeker, r.payloadSize, r.targetSize)
if err != nil {
return err
}
r.paddedReader = paddedReader

return nil
}
64 changes: 64 additions & 0 deletions shared/inflator_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package shared

import (
"io"
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/abi"
)

func TestInflatorReader(t *testing.T) {
req := require.New(t)

// Create a temp file
f, err := os.CreateTemp(t.TempDir(), "buff")
req.NoError(err)
defer f.Close() // nolint

// Store a sample string to the temp file
sampleString := "Testing 123"
n, err := f.WriteString(sampleString)
req.NoError(err)
req.Len(sampleString, n)

// Seek to the start of the file
_, err = f.Seek(0, io.SeekStart)
req.NoError(err)

// Create an inflator reader over the file
paddedSize := 1024
padded := abi.PaddedPieceSize(paddedSize)
ir, err := NewInflatorReader(f, uint64(n), padded.Unpadded())
req.NoError(err)

// Read all bytes into a buffer
buff := make([]byte, paddedSize)
_, err = ir.Read(buff)
req.NoError(err)

// Check that the correct number of bytes was read
req.Len(buff, paddedSize)
// Check that the first part of the buffer matches the sample string
req.Equal([]byte(sampleString), buff[:len(sampleString)])
// Check that the rest of the buffer is zeros
for _, b := range buff[len(sampleString):] {
req.EqualValues(0, b)
}

// Seek to the start of the reader
err = ir.SeekStart()
req.NoError(err)

// Verify that the reader returns the correct bytes, as above
buff = make([]byte, paddedSize)
_, err = ir.Read(buff)
req.NoError(err)
req.Len(buff, paddedSize)
req.Equal([]byte(sampleString), buff[:len(sampleString)])
for _, b := range buff[len(sampleString):] {
req.EqualValues(0, b)
}
}
7 changes: 0 additions & 7 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,13 +628,6 @@ func (p *Provider) runMigrations(ctx context.Context) ([]storagemarket.MinerDeal
return nil, xerrors.Errorf("failed to fetch deals during startup: %w", err)
}

// re-track all deals for whom we still have a local blockstore.
for _, d := range deals {
if _, err := os.Stat(d.InboundCAR); err == nil && d.Ref != nil {
_, _ = p.stores.GetOrOpen(d.ProposalCid.String(), d.InboundCAR, d.Ref.Root)
}
}

// migrate deals to the dagstore if still not migrated.
if ok, err := p.dagStore.MigrateDeals(ctx, deals); err != nil {
return nil, fmt.Errorf("failed to migrate deals to DAG store: %w", err)
Expand Down
41 changes: 7 additions & 34 deletions storagemarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storageimpl

import (
"context"
"io"
"os"
"time"

Expand All @@ -13,13 +12,11 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
Expand All @@ -45,6 +42,10 @@ func (p *providerDealEnvironment) ReadCAR(path string) (*carv2.Reader, error) {
func (p *providerDealEnvironment) FinalizeBlockstore(proposalCid cid.Cid) error {
bs, err := p.p.stores.Get(proposalCid.String())
if err != nil {
if xerrors.Is(err, stores.ErrNotFound) {
// The blockstore has already been cleaned up
return nil
}
return xerrors.Errorf("failed to get read/write blockstore: %w", err)
}

Expand Down Expand Up @@ -108,36 +109,8 @@ func (p *providerDealEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, c
}
}()

// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
w := &writer.Writer{}
written, err := io.Copy(w, rd.DataReader())
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to write to CommP writer: %w", err)
}
if written != int64(rd.Header.DataSize) {
return cid.Undef, "", xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, rd.Header.DataSize)
}

cidAndSize, err := w.Sum()
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to get CommP: %w", err)
}

if cidAndSize.PieceSize < dealSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:],
uint64(cidAndSize.PieceSize),
uint64(dealSize),
)
if err != nil {
return cid.Undef, "", err
}
cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return cidAndSize.PieceCID, filestore.Path(""), err
pieceCID, err := shared.GenerateCommp(rd.DataReader(), rd.Header.DataSize, uint64(dealSize))
return pieceCID, "", err
}

func (p *providerDealEnvironment) FileStore() filestore.FileStore {
Expand Down
8 changes: 4 additions & 4 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
padreader "github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
Expand All @@ -24,6 +23,7 @@ import (

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
Expand Down Expand Up @@ -394,12 +394,12 @@ func HandoffDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor
return ctx.Trigger(storagemarket.ProviderEventDealHandedOff)
}

func handoffDeal(ctx context.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal, reader io.Reader, payloadSize uint64) (*storagemarket.PackingResult, error) {
// because we use the PadReader directly during AP we need to produce the
func handoffDeal(ctx context.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal, reader io.ReadSeeker, payloadSize uint64) (*storagemarket.PackingResult, error) {
// because we use the PadReader directly during Add Piece we need to produce the
// correct amount of zeroes
// (alternative would be to keep precise track of sector offsets for each
// piece which is just too much work for a seldom used feature)
paddedReader, err := padreader.NewInflator(reader, payloadSize, deal.Proposal.PieceSize.Unpadded())
paddedReader, err := shared.NewInflatorReader(reader, payloadSize, deal.Proposal.PieceSize.Unpadded())
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,7 @@ func makeExecutor(ctx context.Context,
PublishDealID: nodeParams.PublishDealID,
WaitForPublishDealsError: nodeParams.WaitForPublishDealsError,
OnDealCompleteError: nodeParams.OnDealCompleteError,
OnDealCompleteSkipCommP: true,
DataCap: nodeParams.DataCap,
GetDataCapErr: nodeParams.GetDataCapError,
}
Expand Down
Loading

0 comments on commit bd2751f

Please sign in to comment.