Skip to content

Commit

Permalink
Storage market request queued event and validation interface changes (#…
Browse files Browse the repository at this point in the history
…555)

* storage market request queued event and validation interface changes

* add transition

* Apply suggestions from code review

Co-authored-by: dirkmc <[email protected]>

* address nits

* fix: docs

Co-authored-by: dirkmc <[email protected]>
  • Loading branch information
aarshkshah1992 and dirkmc authored Jun 10, 2021
1 parent 62bfbff commit 29d0b04
Show file tree
Hide file tree
Showing 17 changed files with 71 additions and 37 deletions.
9 changes: 8 additions & 1 deletion docs/storageclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ stateDiagram-v2
state "StorageDealError" as 26
state "StorageDealClientTransferRestart" as 28
state "StorageDealAwaitingPreCommit" as 29
state "StorageDealTransferQueued" as 30
3 : On entry runs ValidateDealPublished
5 : On entry runs VerifyDealActivated
7 : On entry runs WaitForDealCompletion
Expand Down Expand Up @@ -46,16 +47,22 @@ stateDiagram-v2
12 --> 11 : ClientEventUnexpectedDealState
16 --> 11 : ClientEventDataTransferFailed
17 --> 11 : ClientEventDataTransferFailed
30 --> 11 : ClientEventDataTransferFailed
28 --> 11 : ClientEventDataTransferRestartFailed
16 --> 17 : ClientEventDataTransferInitiated
16 --> 30 : ClientEventDataTransferQueued
30 --> 17 : ClientEventDataTransferInitiated
16 --> 17 : ClientEventDataTransferRestarted
28 --> 17 : ClientEventDataTransferRestarted
30 --> 17 : ClientEventDataTransferRestarted
17 --> 11 : ClientEventDataTransferStalled
30 --> 11 : ClientEventDataTransferStalled
16 --> 11 : ClientEventDataTransferCancelled
17 --> 11 : ClientEventDataTransferCancelled
28 --> 11 : ClientEventDataTransferCancelled
30 --> 11 : ClientEventDataTransferCancelled
16 --> 13 : ClientEventDataTransferComplete
17 --> 13 : ClientEventDataTransferComplete
30 --> 13 : ClientEventDataTransferComplete
13 --> 13 : ClientEventWaitForDealState
13 --> 11 : ClientEventResponseDealDidNotMatch
13 --> 11 : ClientEventDealRejected
Expand Down
Binary file modified docs/storageclient.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/storageclient.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.6.0
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand All @@ -21,7 +21,7 @@ require (
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.6.1
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.6.0 h1:DHIzEc23ydRCCBwtFet3MfgO8gMpZEnw60Y+s71oX6o=
github.com/filecoin-project/go-data-transfer v1.6.0/go.mod h1:E3WW4mCEYwU2y65swPEajSZoFWFmfXt7uwGduoACZQc=
github.com/filecoin-project/go-data-transfer v1.7.0 h1:mFRn+UuTdPROmhplLSekzd4rAs9ug8ubtSY4nw9wYkU=
github.com/filecoin-project/go-data-transfer v1.7.0/go.mod h1:GLRr5BmLEqsLwXfiRDG7uJvph22KGL2M4iOuF8EINaU=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down Expand Up @@ -285,8 +285,8 @@ github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPi
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE=
github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8=
github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/requestvalidation/requestvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func NewProviderRequestValidator(env ValidationEnvironment) *ProviderRequestVali
}

// ValidatePush validates a push request received from the peer that will send data
func (rv *ProviderRequestValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (rv *ProviderRequestValidator) ValidatePush(isRestart bool, _ datatransfer.ChannelID, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
return nil, errors.New("No pushes accepted")
}

// ValidatePull validates a pull request received from the peer that will receive data
func (rv *ProviderRequestValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (rv *ProviderRequestValidator) ValidatePull(isRestart bool, _ datatransfer.ChannelID, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
proposal, ok := voucher.(*retrievalmarket.DealProposal)
var legacyProtocol bool
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestValidatePush(t *testing.T) {
sender := shared_testutil.GeneratePeers(1)[0]
voucher := shared_testutil.MakeTestDealProposal()
requestValidator := requestvalidation.NewProviderRequestValidator(fve)
voucherResult, err := requestValidator.ValidatePush(false, sender, &voucher, voucher.PayloadCID, shared.AllSelector())
voucherResult, err := requestValidator.ValidatePush(false, datatransfer.ChannelID{}, sender, &voucher, voucher.PayloadCID, shared.AllSelector())
require.Equal(t, nil, voucherResult)
require.Error(t, err)
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestValidatePull(t *testing.T) {
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
requestValidator := requestvalidation.NewProviderRequestValidator(&data.fve)
voucherResult, err := requestValidator.ValidatePull(data.isRestart, data.sender, data.voucher, data.baseCid, data.selector)
voucherResult, err := requestValidator.ValidatePull(data.isRestart, datatransfer.ChannelID{}, data.sender, data.voucher, data.baseCid, data.selector)
require.Equal(t, data.expectedVoucherResult, voucherResult)
if data.expectedError == nil {
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,11 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *testharness.Stor

type fakeDTValidator struct{}

func (v *fakeDTValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (v *fakeDTValidator) ValidatePush(isRestart bool, _ datatransfer.ChannelID, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
return nil, nil
}

func (v *fakeDTValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (v *fakeDTValidator) ValidatePull(isRestart bool, _ datatransfer.ChannelID, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
return nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions shared_testutil/generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket

type FakeDTValidator struct{}

func (v *FakeDTValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (v *FakeDTValidator) ValidatePush(isRestart bool, _ datatransfer.ChannelID, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
return nil, nil
}

func (v *FakeDTValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
func (v *FakeDTValidator) ValidatePull(isRestart bool, _ datatransfer.ChannelID, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) {
return nil, nil
}

Expand Down
4 changes: 4 additions & 0 deletions shared_testutil/testchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func NewTestChannel(params TestChannelParams) datatransfer.ChannelState {
return tc
}

func (tc *TestChannel) ReceivedCidsLen() int {
return len(tc.receivedCids)
}

// TransferID returns the transfer id for this channel
func (tc *TestChannel) TransferID() datatransfer.TransferID {
return tc.transferID
Expand Down
4 changes: 4 additions & 0 deletions storagemarket/dealstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ const (

// StorageDealAwaitingPreCommit means a deal is ready and must be pre-committed
StorageDealAwaitingPreCommit

// StorageDealTransferQueued means the data transfer request has been queued and will be executed soon.
StorageDealTransferQueued
)

// DealStates maps StorageDealStatus codes to string names
Expand Down Expand Up @@ -140,6 +143,7 @@ var DealStates = map[StorageDealStatus]string{
StorageDealFinalizing: "StorageDealFinalizing",
StorageDealClientTransferRestart: "StorageDealClientTransferRestart",
StorageDealProviderTransferAwaitRestart: "StorageDealProviderTransferAwaitRestart",
StorageDealTransferQueued: "StorageDealTransferQueued",
}

// DealStatesDescriptions maps StorageDealStatus codes to string description for better UX
Expand Down
5 changes: 5 additions & 0 deletions storagemarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ const (

// ClientEventDataTransferCancelled happens when a data transfer is cancelled
ClientEventDataTransferCancelled

// ClientEventDataTransferQueued happens when we queue the provider's request to transfer data to it
// in response to the push request we send to the provider.
ClientEventDataTransferQueued
)

// ClientEvents maps client event codes to string names
Expand Down Expand Up @@ -143,6 +147,7 @@ var ClientEvents = map[ClientEvent]string{
ClientEventDataTransferRestartFailed: "ClientEventDataTransferRestartFailed",
ClientEventDataTransferStalled: "ClientEventDataTransferStalled",
ClientEventDataTransferCancelled: "ClientEventDataTransferCancelled",
ClientEventDataTransferQueued: "ClientEventDataTransferQueued",
}

func (e ClientEvent) String() string {
Expand Down
21 changes: 16 additions & 5 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var ClientEvents = fsm.Events{
deal.AddLog(deal.Message)
return nil
}),

fsm.Event(storagemarket.ClientEventInitiateDataTransfer).
From(storagemarket.StorageDealFundsReserved).To(storagemarket.StorageDealStartDataTransfer).
Action(func(deal *storagemarket.ClientDeal) error {
Expand All @@ -87,7 +88,7 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferFailed).
FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring).
FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring, storagemarket.StorageDealTransferQueued).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to complete data transfer: %w", err).Error()
Expand All @@ -103,16 +104,25 @@ var ClientEvents = fsm.Events{
return nil
}),

// The client has sent a push request to the provider, and in response the provider has
// opened a request for data to the client. The transfer is in the client's queue.
fsm.Event(storagemarket.ClientEventDataTransferQueued).
FromMany(storagemarket.StorageDealStartDataTransfer).To(storagemarket.StorageDealTransferQueued).
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.AddLog("provider data transfer request added to client's queue: channel id <%s>", channelId)
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferInitiated).
FromMany(storagemarket.StorageDealStartDataTransfer).To(storagemarket.StorageDealTransferring).
FromMany(storagemarket.StorageDealTransferQueued).To(storagemarket.StorageDealTransferring).
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.TransferChannelID = &channelId
deal.AddLog("data transfer initiated on channel id <%s>", channelId)
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferRestarted).
FromMany(storagemarket.StorageDealClientTransferRestart, storagemarket.StorageDealStartDataTransfer).To(storagemarket.StorageDealTransferring).
FromMany(storagemarket.StorageDealClientTransferRestart, storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferQueued).To(storagemarket.StorageDealTransferring).
From(storagemarket.StorageDealTransferring).ToJustRecord().
Action(func(deal *storagemarket.ClientDeal, channelId datatransfer.ChannelID) error {
deal.TransferChannelID = &channelId
Expand All @@ -122,7 +132,7 @@ var ClientEvents = fsm.Events{
}),

fsm.Event(storagemarket.ClientEventDataTransferStalled).
From(storagemarket.StorageDealTransferring).
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealTransferQueued).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("could not complete data transfer, could not connect to provider %s", deal.Miner).Error()
Expand All @@ -135,6 +145,7 @@ var ClientEvents = fsm.Events{
storagemarket.StorageDealStartDataTransfer,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealClientTransferRestart,
storagemarket.StorageDealTransferQueued,
).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
Expand All @@ -144,7 +155,7 @@ var ClientEvents = fsm.Events{
}),

fsm.Event(storagemarket.ClientEventDataTransferComplete).
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealStartDataTransfer).
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferQueued).
To(storagemarket.StorageDealCheckForAcceptance),
fsm.Event(storagemarket.ClientEventWaitForDealState).
From(storagemarket.StorageDealCheckForAcceptance).ToNoChange().
Expand Down
2 changes: 2 additions & 0 deletions storagemarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferRestarted, channelState.ChannelID())
case datatransfer.Disconnected:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferStalled)
case datatransfer.TransferRequestQueued:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferQueued, channelState.ChannelID())
case datatransfer.Accept:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferInitiated, channelState.ChannelID())
case datatransfer.Error:
Expand Down
Loading

0 comments on commit 29d0b04

Please sign in to comment.