diff --git a/go.mod b/go.mod index d21eff7ca..893e3f248 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/libp2p/go-libp2p v0.6.0 github.com/libp2p/go-libp2p-core v0.5.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/multiformats/go-multihash v0.0.13 github.com/stretchr/testify v1.5.1 github.com/whyrusleeping/cbor-gen v0.0.0-20200414195334-429a0b5e922e diff --git a/shared_testutil/test_network_types.go b/shared_testutil/test_network_types.go index fb4ef05c7..bb7e8321c 100644 --- a/shared_testutil/test_network_types.go +++ b/shared_testutil/test_network_types.go @@ -10,6 +10,7 @@ import ( rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" + "github.com/filecoin-project/go-fil-markets/storagemarket" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -505,16 +506,16 @@ func StubbedDealPaymentReader(payment rm.DealPayment) DealPaymentReader { } // StorageDealProposalReader is a function to mock reading deal proposals. -type StorageDealProposalReader func() (smnet.Proposal, error) +type StorageDealProposalReader func() (storagemarket.ProposalRequest, error) // StorageDealResponseReader is a function to mock reading deal responses. -type StorageDealResponseReader func() (smnet.SignedResponse, error) +type StorageDealResponseReader func() (storagemarket.SignedResponse, error) // StorageDealResponseWriter is a function to mock writing deal responses. -type StorageDealResponseWriter func(smnet.SignedResponse) error +type StorageDealResponseWriter func(storagemarket.SignedResponse) error // StorageDealProposalWriter is a function to mock writing deal proposals. -type StorageDealProposalWriter func(smnet.Proposal) error +type StorageDealProposalWriter func(storagemarket.ProposalRequest) error // TestStorageDealStream is a retrieval deal stream with predefined // stubbed behavior. @@ -562,22 +563,22 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) smnet.StorageD } // ReadDealProposal calls the mocked deal proposal reader function. -func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, error) { +func (tsds *TestStorageDealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) { return tsds.proposalReader() } // WriteDealProposal calls the mocked deal proposal writer function. -func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal smnet.Proposal) error { +func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal storagemarket.ProposalRequest) error { return tsds.proposalWriter(dealProposal) } // ReadDealResponse calls the mocked deal response reader function. -func (tsds *TestStorageDealStream) ReadDealResponse() (smnet.SignedResponse, error) { +func (tsds *TestStorageDealStream) ReadDealResponse() (storagemarket.SignedResponse, error) { return tsds.responseReader() } // WriteDealResponse calls the mocked deal response writer function. -func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse smnet.SignedResponse) error { +func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse storagemarket.SignedResponse) error { return tsds.responseWriter(dealResponse) } @@ -588,57 +589,57 @@ func (tsds TestStorageDealStream) RemotePeer() peer.ID { return tsds.p } func (tsds TestStorageDealStream) Close() error { return nil } // TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal. -func TrivialStorageDealProposalReader() (smnet.Proposal, error) { - return smnet.Proposal{}, nil +func TrivialStorageDealProposalReader() (storagemarket.ProposalRequest, error) { + return storagemarket.ProposalRequest{}, nil } // TrivialStorageDealResponseReader succeeds trivially, returning an empty deal response. -func TrivialStorageDealResponseReader() (smnet.SignedResponse, error) { - return smnet.SignedResponse{}, nil +func TrivialStorageDealResponseReader() (storagemarket.SignedResponse, error) { + return storagemarket.SignedResponse{}, nil } // TrivialStorageDealProposalWriter succeeds trivially, returning no error. -func TrivialStorageDealProposalWriter(smnet.Proposal) error { +func TrivialStorageDealProposalWriter(storagemarket.ProposalRequest) error { return nil } // TrivialStorageDealResponseWriter succeeds trivially, returning no error. -func TrivialStorageDealResponseWriter(smnet.SignedResponse) error { +func TrivialStorageDealResponseWriter(storagemarket.SignedResponse) error { return nil } // StubbedStorageProposalReader returns the given proposal when called -func StubbedStorageProposalReader(proposal smnet.Proposal) StorageDealProposalReader { - return func() (smnet.Proposal, error) { +func StubbedStorageProposalReader(proposal storagemarket.ProposalRequest) StorageDealProposalReader { + return func() (storagemarket.ProposalRequest, error) { return proposal, nil } } // StubbedStorageResponseReader returns the given deal response when called -func StubbedStorageResponseReader(response smnet.SignedResponse) StorageDealResponseReader { - return func() (smnet.SignedResponse, error) { +func StubbedStorageResponseReader(response storagemarket.SignedResponse) StorageDealResponseReader { + return func() (storagemarket.SignedResponse, error) { return response, nil } } // FailStorageProposalWriter always fails -func FailStorageProposalWriter(smnet.Proposal) error { +func FailStorageProposalWriter(storagemarket.ProposalRequest) error { return errors.New("write proposal failed") } // FailStorageProposalReader always fails -func FailStorageProposalReader() (smnet.Proposal, error) { - return smnet.ProposalUndefined, errors.New("read proposal failed") +func FailStorageProposalReader() (storagemarket.ProposalRequest, error) { + return storagemarket.ProposalRequestUndefined, errors.New("read proposal failed") } // FailStorageResponseWriter always fails -func FailStorageResponseWriter(smnet.SignedResponse) error { +func FailStorageResponseWriter(storagemarket.SignedResponse) error { return errors.New("write proposal failed") } // FailStorageResponseReader always fails -func FailStorageResponseReader() (smnet.SignedResponse, error) { - return smnet.SignedResponseUndefined, errors.New("read response failed") +func FailStorageResponseReader() (storagemarket.SignedResponse, error) { + return storagemarket.SignedResponseUndefined, errors.New("read response failed") } // TestPeerResolver provides a fake retrievalmarket PeerResolver @@ -650,4 +651,5 @@ type TestPeerResolver struct { func (tpr TestPeerResolver) GetPeers(cid.Cid) ([]rm.RetrievalPeer, error) { return tpr.Peers, tpr.ResolverError } -var _ rm.PeerResolver = &TestPeerResolver{} \ No newline at end of file + +var _ rm.PeerResolver = &TestPeerResolver{} diff --git a/shared_testutil/test_types.go b/shared_testutil/test_types.go index fb10335ee..a2fd064b7 100644 --- a/shared_testutil/test_types.go +++ b/shared_testutil/test_types.go @@ -16,7 +16,6 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" - smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) // MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields @@ -213,8 +212,8 @@ func MakeTestSignedStorageAsk() *storagemarket.SignedStorageAsk { // MakeTestStorageNetworkProposal generates a proposal that can be sent over the // network to a provider -func MakeTestStorageNetworkProposal() smnet.Proposal { - return smnet.Proposal{ +func MakeTestStorageNetworkProposal() storagemarket.ProposalRequest { + return storagemarket.ProposalRequest{ DealProposal: MakeTestClientDealProposal(), Piece: &storagemarket.DataRef{Root: GenerateCids(1)[0]}, } @@ -222,8 +221,8 @@ func MakeTestStorageNetworkProposal() smnet.Proposal { // MakeTestStorageNetworkResponse generates a response to a proposal sent over // the network -func MakeTestStorageNetworkResponse() smnet.Response { - return smnet.Response{ +func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse { + return storagemarket.ProposalResponse{ State: storagemarket.StorageDealSealing, Proposal: GenerateCids(1)[0], PublishMessage: &(GenerateCids(1)[0]), @@ -232,23 +231,23 @@ func MakeTestStorageNetworkResponse() smnet.Response { // MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over // the network that is signed -func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse { - return smnet.SignedResponse{ +func MakeTestStorageNetworkSignedResponse() storagemarket.SignedResponse { + return storagemarket.SignedResponse{ Response: MakeTestStorageNetworkResponse(), Signature: MakeTestSignature(), } } // MakeTestStorageAskRequest generates a request to get a provider's ask -func MakeTestStorageAskRequest() smnet.AskRequest { - return smnet.AskRequest{ +func MakeTestStorageAskRequest() storagemarket.AskRequest { + return storagemarket.AskRequest{ Miner: address.TestAddress2, } } // MakeTestStorageAskResponse generates a response to an ask request -func MakeTestStorageAskResponse() smnet.AskResponse { - return smnet.AskResponse{ +func MakeTestStorageAskResponse() storagemarket.AskResponse { + return storagemarket.AskResponse{ Ask: MakeTestSignedStorageAsk(), } } diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 2ece5c757..af3744dcf 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/pieceio" @@ -92,6 +93,7 @@ func NewClient( } func (c *Client) Run(ctx context.Context) { + c.net.SetDelegate(c) } func (c *Client) Stop() { @@ -156,7 +158,7 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } - request := network.AskRequest{Miner: info.Address} + request := storagemarket.AskRequest{Miner: info.Address} if err := s.WriteAskRequest(request); err != nil { return nil, xerrors.Errorf("failed to send ask request: %w", err) } @@ -247,15 +249,6 @@ func (c *Client) ProposeStorageDeal( return nil, xerrors.Errorf("setting up deal tracking: %w", err) } - s, err := c.net.NewDealStream(info.PeerID) - if err != nil { - return nil, xerrors.Errorf("connecting to storage provider failed: %w", err) - } - err = c.conns.AddStream(deal.ProposalCid, s) - if err != nil { - return nil, err - } - err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen) if err != nil { return nil, xerrors.Errorf("initializing state machine: %w", err) @@ -308,6 +301,25 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh return shared.Unsubscribe(c.pubSub.Subscribe(subscriber)) } +func (c *Client) HandleAskStream(s network.StorageAskStream) { + s.Close() +} + +func (c *Client) HandleDealStream(s network.StorageDealStream) { + defer s.Close() + log.Info("Handling storage deal proposal!") + + response, err := s.ReadDealResponse() + if err != nil { + log.Errorf("%+v", err) + return + } + err = c.statemachines.Send(response.Response.Proposal, storagemarket.ClientEventReceiveResponse, response) + if err != nil { + log.Errorf("%+v", err) + } +} + func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) { evt, ok := eventName.(storagemarket.ClientEvent) if !ok { @@ -355,10 +367,15 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode { return c.c.node } -func (c *clientDealEnvironment) DealStream(proposalCid cid.Cid) (network.StorageDealStream, error) { - return c.c.conns.DealStream(proposalCid) -} - -func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error { - return c.c.conns.Disconnect(proposalCid) +func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error { + s, err := c.c.net.NewDealStream(p) + if err != nil { + return err + } + err = s.WriteDealProposal(proposal) + closeErr := s.Close() + if closeErr != nil { + log.Warnf("error closing stream: %w", closeErr) + } + return err } diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index c223baf7a..47ad27386 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -16,7 +16,7 @@ var ClientEvents = fsm.Events{ fsm.Event(storagemarket.ClientEventFundingInitiated). From(storagemarket.StorageDealEnsureClientFunds).To(storagemarket.StorageDealClientFunding). Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error { - deal.AddFundsCid = mcid + deal.AddFundsCid = &mcid return nil }), fsm.Event(storagemarket.ClientEventEnsureFundsFailed). @@ -34,17 +34,11 @@ var ClientEvents = fsm.Events{ return nil }), fsm.Event(storagemarket.ClientEventDealProposed). - From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating), - fsm.Event(storagemarket.ClientEventDealStreamLookupErrored). - FromAny().To(storagemarket.StorageDealFailing). - Action(func(deal *storagemarket.ClientDeal, err error) error { - deal.Message = xerrors.Errorf("miner connection error: %w", err).Error() - return nil - }), - fsm.Event(storagemarket.ClientEventReadResponseFailed). - From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError). - Action(func(deal *storagemarket.ClientDeal, err error) error { - deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error() + From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForResponse), + fsm.Event(storagemarket.ClientEventReceiveResponse). + From(storagemarket.StorageDealWaitingForResponse).To(storagemarket.StorageDealValidating). + Action(func(deal *storagemarket.ClientDeal, response storagemarket.SignedResponse) error { + deal.LastResponse = &response return nil }), fsm.Event(storagemarket.ClientEventResponseVerificationFailed). diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index 69ff79c93..a1616b3a0 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -5,12 +5,11 @@ import ( "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + peer "github.com/libp2p/go-libp2p-peer" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/network" - smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) var log = logging.Logger("storagemarket_impl") @@ -19,8 +18,7 @@ var log = logging.Logger("storagemarket_impl") // dependencies from the storage client environment type ClientDealEnvironment interface { Node() storagemarket.StorageClientNode - DealStream(proposalCid cid.Cid) (smnet.StorageDealStream, error) - CloseStream(proposalCid cid.Cid) error + WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error } // ClientStateEntryFunc is the type for all state entry functions on a storage client @@ -53,7 +51,7 @@ func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { node := environment.Node() - return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { + return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds err: %w", err)) } @@ -67,13 +65,9 @@ func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal sto // ProposeDeal sends the deal proposal to the provider func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - s, err := environment.DealStream(deal.ProposalCid) - if err != nil { - return ctx.Trigger(storagemarket.ClientEventDealStreamLookupErrored, err) - } - proposal := network.Proposal{DealProposal: &deal.ClientDealProposal, Piece: deal.DataRef} - if err := s.WriteDealProposal(proposal); err != nil { + proposal := storagemarket.ProposalRequest{DealProposal: &deal.ClientDealProposal, Piece: deal.DataRef} + if err := environment.WriteDealProposal(deal.Miner, proposal); err != nil { return ctx.Trigger(storagemarket.ClientEventWriteProposalFailed, err) } @@ -82,16 +76,8 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag // VerifyDealResponse reads and verifies the response from the provider to the proposed deal func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - s, err := environment.DealStream(deal.ProposalCid) - if err != nil { - return ctx.Trigger(storagemarket.ClientEventDealStreamLookupErrored, err) - } - - resp, err := s.ReadDealResponse() - if err != nil { - return ctx.Trigger(storagemarket.ClientEventReadResponseFailed, err) - } + resp := *deal.LastResponse tok, _, err := environment.Node().GetChainHead(ctx.Context()) if err != nil { return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed) @@ -109,10 +95,6 @@ func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal return ctx.Trigger(storagemarket.ClientEventDealRejected, resp.Response.State, resp.Response.Message) } - if err := environment.CloseStream(deal.ProposalCid); err != nil { - return ctx.Trigger(storagemarket.ClientEventStreamCloseError, err) - } - return ctx.Trigger(storagemarket.ClientEventDealAccepted, resp.Response.PublishMessage) } @@ -147,10 +129,6 @@ func VerifyDealActivated(ctx fsm.Context, environment ClientDealEnvironment, dea // FailDeal cleans up a failing deal func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - if err := environment.CloseStream(deal.ProposalCid); err != nil { - return ctx.Trigger(storagemarket.ClientEventStreamCloseError, err) - } - // TODO: store in some sort of audit log log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message) diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index 22ea861f6..f8e6d1025 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-peer" "github.com/stretchr/testify/require" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" @@ -32,7 +33,7 @@ func TestEnsureFunds(t *testing.T) { addFundsCid := tut.GenerateCids(1)[0] t.Run("immediately succeeds", func(t *testing.T) { - runEnsureFunds(t, makeNode(nodeParams{}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runEnsureFunds(t, makeNode(nodeParams{}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) }) }) @@ -41,7 +42,7 @@ func TestEnsureFunds(t *testing.T) { params := nodeParams{ AddFundsCid: addFundsCid, } - runEnsureFunds(t, makeNode(params), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runEnsureFunds(t, makeNode(params), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealClientFunding, deal.State) }) }) @@ -50,7 +51,7 @@ func TestEnsureFunds(t *testing.T) { n := makeNode(nodeParams{ EnsureFundsError: errors.New("Something went wrong"), }) - runEnsureFunds(t, n, nil, nil, nil, func(deal storagemarket.ClientDeal) { + runEnsureFunds(t, n, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "adding market funds failed: Something went wrong", deal.Message) }) @@ -65,13 +66,13 @@ func TestWaitForFunding(t *testing.T) { runEnsureFunds := makeExecutor(ctx, eventProcessor, clientstates.WaitForFunding, storagemarket.StorageDealClientFunding, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.Ok}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.Ok}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFundsEnsured, deal.State) }) }) t.Run("EnsureClientFunds fails", func(t *testing.T) { - runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runEnsureFunds(t, makeNode(nodeParams{WaitForMessageExitCode: exitcode.ErrInsufficientFunds}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "adding market funds failed: AddFunds exit code: 19", deal.Message) }) @@ -92,20 +93,13 @@ func TestProposeDeal(t *testing.T) { } t.Run("succeeds", func(t *testing.T) { - runProposeDeal(t, makeNode(nodeParams{}), nil, dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealValidating, deal.State) - }) - }) - - t.Run("deal stream lookup fails", func(t *testing.T) { - runProposeDeal(t, makeNode(nodeParams{}), errors.New("deal stream not found"), nil, nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "miner connection error: deal stream not found", deal.Message) + runProposeDeal(t, makeNode(nodeParams{}), dealStream(tut.TrivialStorageDealProposalWriter), nil, func(deal storagemarket.ClientDeal) { + require.Equal(t, storagemarket.StorageDealWaitingForResponse, deal.State) }) }) t.Run("write proposal fails fails", func(t *testing.T) { - runProposeDeal(t, makeNode(nodeParams{}), nil, dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal) { + runProposeDeal(t, makeNode(nodeParams{}), dealStream(tut.FailStorageProposalWriter), nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "sending proposal to storage provider failed: write proposal failed", deal.Message) }) @@ -123,106 +117,69 @@ func TestVerifyResponse(t *testing.T) { publishMessage := &(tut.GenerateCids(1)[0]) - dealStream := func(reader tut.StorageDealResponseReader) smnet.StorageDealStream { - return tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{ - ResponseReader: reader, - }) - } - t.Run("succeeds", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ + response := storagemarket.SignedResponse{ + Response: storagemarket.ProposalResponse{ State: storagemarket.StorageDealProposalAccepted, Proposal: proposalNd.Cid(), PublishMessage: publishMessage, }, Signature: tut.MakeTestSignature(), - })) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, nil, func(deal storagemarket.ClientDeal) { + } + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, &response, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealProposalAccepted, deal.State) require.Equal(t, publishMessage, deal.PublishMessage) }) }) - t.Run("deal stream lookup fails", func(t *testing.T) { - dealStreamErr := errors.New("deal stream not found") - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), dealStreamErr, dealStream(nil), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealFailing, deal.State) - require.Equal(t, "miner connection error: deal stream not found", deal.Message) - }) - }) - - t.Run("read response fails", func(t *testing.T) { - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, dealStream(tut.FailStorageResponseReader), nil, func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error reading Response message: read response failed", deal.Message) - }) - }) - t.Run("verify response fails", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ + response := storagemarket.SignedResponse{ + Response: storagemarket.ProposalResponse{ State: storagemarket.StorageDealProposalAccepted, Proposal: proposalNd.Cid(), PublishMessage: publishMessage, }, Signature: tut.MakeTestSignature(), - })) + } failToVerifyNode := makeNode(nodeParams{VerifySignatureFails: true}) - runVerifyResponse(t, failToVerifyNode, nil, stream, nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, failToVerifyNode, nil, &response, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, "unable to verify signature on deal response", deal.Message) }) }) t.Run("incorrect proposal cid", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ + response := storagemarket.SignedResponse{ + Response: storagemarket.ProposalResponse{ State: storagemarket.StorageDealProposalAccepted, Proposal: tut.GenerateCids(1)[0], PublishMessage: publishMessage, }, Signature: tut.MakeTestSignature(), - })) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, nil, func(deal storagemarket.ClientDeal) { + } + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, &response, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Regexp(t, "^miner responded to a wrong proposal:", deal.Message) }) }) t.Run("deal rejected", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ + response := storagemarket.SignedResponse{ + Response: storagemarket.ProposalResponse{ State: storagemarket.StorageDealProposalRejected, Proposal: proposalNd.Cid(), PublishMessage: publishMessage, Message: "because reasons", }, Signature: tut.MakeTestSignature(), - })) + } expErr := fmt.Sprintf("deal failed: (State=%d) because reasons", storagemarket.StorageDealProposalRejected) - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, nil, func(deal storagemarket.ClientDeal) { + runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, &response, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealFailing, deal.State) require.Equal(t, deal.Message, expErr) }) }) - t.Run("deal stream close errors", func(t *testing.T) { - stream := dealStream(tut.StubbedStorageResponseReader(smnet.SignedResponse{ - Response: smnet.Response{ - State: storagemarket.StorageDealProposalAccepted, - Proposal: proposalNd.Cid(), - PublishMessage: publishMessage, - }, - Signature: tut.MakeTestSignature(), - })) - closeStreamErr := errors.New("something went wrong") - runVerifyResponse(t, makeNode(nodeParams{VerifySignatureFails: false}), nil, stream, closeStreamErr, func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error attempting to close stream: something went wrong", deal.Message) - }) - }) - } func TestValidateDealPublished(t *testing.T) { @@ -233,7 +190,7 @@ func TestValidateDealPublished(t *testing.T) { runValidateDealPublished := makeExecutor(ctx, eventProcessor, clientstates.ValidateDealPublished, storagemarket.StorageDealProposalAccepted, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - runValidateDealPublished(t, makeNode(nodeParams{ValidatePublishedDealID: abi.DealID(5)}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runValidateDealPublished(t, makeNode(nodeParams{ValidatePublishedDealID: abi.DealID(5)}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealSealing, deal.State) require.Equal(t, abi.DealID(5), deal.DealID) }) @@ -244,7 +201,7 @@ func TestValidateDealPublished(t *testing.T) { ValidatePublishedDealID: abi.DealID(5), ValidatePublishedError: errors.New("Something went wrong"), }) - runValidateDealPublished(t, n, nil, nil, nil, func(deal storagemarket.ClientDeal) { + runValidateDealPublished(t, n, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error validating deal published: Something went wrong", deal.Message) }) @@ -259,20 +216,20 @@ func TestVerifyDealActivated(t *testing.T) { runVerifyDealActivated := makeExecutor(ctx, eventProcessor, clientstates.VerifyDealActivated, storagemarket.StorageDealSealing, clientDealProposal) t.Run("succeeds", func(t *testing.T) { - runVerifyDealActivated(t, makeNode(nodeParams{}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runVerifyDealActivated(t, makeNode(nodeParams{}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealActive, deal.State) }) }) t.Run("fails synchronously", func(t *testing.T) { - runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedSyncError: errors.New("Something went wrong")}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error in deal activation: Something went wrong", deal.Message) }) }) t.Run("fails asynchronously", func(t *testing.T) { - runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}), nil, nil, nil, func(deal storagemarket.ClientDeal) { + runVerifyDealActivated(t, makeNode(nodeParams{DealCommittedAsyncError: errors.New("Something went wrong later")}), nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) require.Equal(t, "error in deal activation: Something went wrong later", deal.Message) }) @@ -286,25 +243,18 @@ func TestFailDeal(t *testing.T) { clientDealProposal := tut.MakeTestClientDealProposal() runFailDeal := makeExecutor(ctx, eventProcessor, clientstates.FailDeal, storagemarket.StorageDealFailing, clientDealProposal) - t.Run("able to close stream", func(t *testing.T) { - runFailDeal(t, nil, nil, nil, nil, func(deal storagemarket.ClientDeal) { + t.Run("success", func(t *testing.T) { + runFailDeal(t, nil, nil, nil, func(deal storagemarket.ClientDeal) { require.Equal(t, storagemarket.StorageDealError, deal.State) }) }) - t.Run("unable to close stream", func(t *testing.T) { - runFailDeal(t, nil, nil, nil, errors.New("unable to close"), func(deal storagemarket.ClientDeal) { - require.Equal(t, storagemarket.StorageDealError, deal.State) - require.Equal(t, "error attempting to close stream: unable to close", deal.Message) - }) - }) } type executor func(t *testing.T, node storagemarket.StorageClientNode, - dealStreamErr error, dealStream smnet.StorageDealStream, - closeStreamErr error, + response *storagemarket.SignedResponse, dealInspector func(deal storagemarket.ClientDeal)) func makeExecutor(ctx context.Context, @@ -314,13 +264,14 @@ func makeExecutor(ctx context.Context, clientDealProposal *market.ClientDealProposal) executor { return func(t *testing.T, node storagemarket.StorageClientNode, - dealStreamErr error, dealStream smnet.StorageDealStream, - closeStreamErr error, + response *storagemarket.SignedResponse, dealInspector func(deal storagemarket.ClientDeal)) { dealState, err := tut.MakeTestClientDeal(initialState, clientDealProposal) + dealState.AddFundsCid = &tut.GenerateCids(1)[0] + dealState.LastResponse = response require.NoError(t, err) - environment := &fakeEnvironment{node, dealStream, dealStreamErr, closeStreamErr} + environment := &fakeEnvironment{node, dealStream} fsmCtx := fsmtest.NewTestContext(ctx, eventProcessor) err = stateEntryFunc(fsmCtx, environment, *dealState) require.NoError(t, err) @@ -369,23 +320,14 @@ func makeNode(params nodeParams) storagemarket.StorageClientNode { } type fakeEnvironment struct { - node storagemarket.StorageClientNode - dealStream smnet.StorageDealStream - dealStreamErr error - closeStreamErr error + node storagemarket.StorageClientNode + dealStream smnet.StorageDealStream } func (fe *fakeEnvironment) Node() storagemarket.StorageClientNode { return fe.node } -func (fe *fakeEnvironment) DealStream(proposalCid cid.Cid) (smnet.StorageDealStream, error) { - if fe.dealStreamErr == nil { - return fe.dealStream, nil - } - return nil, fe.dealStreamErr -} - -func (fe *fakeEnvironment) CloseStream(proposalCid cid.Cid) error { - return fe.closeStreamErr +func (fe *fakeEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error { + return fe.dealStream.WriteDealProposal(proposal) } diff --git a/storagemarket/impl/clientutils/clientutils.go b/storagemarket/impl/clientutils/clientutils.go index 513a44ce6..20f4f9e5b 100644 --- a/storagemarket/impl/clientutils/clientutils.go +++ b/storagemarket/impl/clientutils/clientutils.go @@ -13,7 +13,6 @@ import ( "github.com/filecoin-project/go-fil-markets/pieceio" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) // CommP calculates the commP for a given dataref @@ -39,7 +38,7 @@ type VerifyFunc func(context.Context, crypto.Signature, address.Address, []byte, // VerifyResponse verifies the signature on the given signed response matches // the given miner address, using the given signature verification function -func VerifyResponse(ctx context.Context, resp network.SignedResponse, minerAddr address.Address, tok shared.TipSetToken, verifier VerifyFunc) error { +func VerifyResponse(ctx context.Context, resp storagemarket.SignedResponse, minerAddr address.Address, tok shared.TipSetToken, verifier VerifyFunc) error { b, err := cborutil.Dump(&resp.Response) if err != nil { return err diff --git a/storagemarket/impl/clientutils/clientutils_test.go b/storagemarket/impl/clientutils/clientutils_test.go index 05390160a..a02acf2c7 100644 --- a/storagemarket/impl/clientutils/clientutils_test.go +++ b/storagemarket/impl/clientutils/clientutils_test.go @@ -19,7 +19,6 @@ import ( "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) func TestCommP(t *testing.T) { @@ -69,7 +68,7 @@ func TestCommP(t *testing.T) { func TestVerifyResponse(t *testing.T) { tests := map[string]struct { - sresponse network.SignedResponse + sresponse storagemarket.SignedResponse verifier clientutils.VerifyFunc shouldErr bool }{ @@ -81,8 +80,8 @@ func TestVerifyResponse(t *testing.T) { shouldErr: false, }, "bad response": { - sresponse: network.SignedResponse{ - Response: network.Response{}, + sresponse: storagemarket.SignedResponse{ + Response: storagemarket.ProposalResponse{}, Signature: shared_testutil.MakeTestSignature(), }, verifier: func(context.Context, crypto.Signature, address.Address, []byte, shared.TipSetToken) (bool, error) { diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 8b49536d6..a7a92f813 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -139,9 +139,8 @@ func (p *Provider) HandleDealStream(s network.StorageDealStream) { err := p.receiveDeal(s) if err != nil { log.Errorf("%+v", err) - s.Close() - return } + s.Close() } func (p *Provider) receiveDeal(s network.StorageDealStream) error { @@ -168,10 +167,6 @@ func (p *Provider) receiveDeal(s network.StorageDealStream) error { if err != nil { return err } - err = p.conns.AddStream(proposalNd.Cid(), s) - if err != nil { - return err - } return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen) } @@ -303,7 +298,7 @@ func (p *Provider) HandleAskStream(s network.StorageAskStream) { return } - resp := network.AskResponse{ + resp := storagemarket.AskResponse{ Ask: p.storedAsk.GetAsk(ar.Miner), } @@ -413,11 +408,7 @@ func (p *providerDealEnvironment) PieceStore() piecestore.PieceStore { return p.p.pieceStore } -func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, resp *network.Response) error { - s, err := p.p.conns.DealStream(resp.Proposal) - if err != nil { - return xerrors.Errorf("couldn't send response: %w", err) - } +func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, client peer.ID, resp *storagemarket.ProposalResponse) error { tok, _, err := p.p.spn.GetChainHead(ctx) if err != nil { @@ -429,21 +420,22 @@ func (p *providerDealEnvironment) SendSignedResponse(ctx context.Context, resp * return xerrors.Errorf("failed to sign response message: %w", err) } - signedResponse := network.SignedResponse{ + signedResponse := storagemarket.SignedResponse{ Response: *resp, Signature: sig, } - err = s.WriteDealResponse(signedResponse) + s, err := p.p.net.NewDealStream(client) if err != nil { - // Assume client disconnected - _ = p.p.conns.Disconnect(resp.Proposal) + return err } - return err -} -func (p *providerDealEnvironment) Disconnect(proposalCid cid.Cid) error { - return p.p.conns.Disconnect(proposalCid) + err = s.WriteDealResponse(signedResponse) + closeErr := s.Close() + if closeErr != nil { + log.Warnf("Error closing stream: %w", closeErr) + } + return err } func (p *providerDealEnvironment) DealAcceptanceBuffer() abi.ChainEpoch { diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index a8b62995e..84605f001 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -54,7 +54,7 @@ var ProviderEvents = fsm.Events{ fsm.Event(storagemarket.ProviderEventFundingInitiated). From(storagemarket.StorageDealEnsureProviderFunds).To(storagemarket.StorageDealProviderFunding). Action(func(deal *storagemarket.MinerDeal, mcid cid.Cid) error { - deal.AddFundsCid = mcid + deal.AddFundsCid = &mcid return nil }), fsm.Event(storagemarket.ProviderEventFunded). @@ -62,7 +62,7 @@ var ProviderEvents = fsm.Events{ fsm.Event(storagemarket.ProviderEventDealPublishInitiated). From(storagemarket.StorageDealPublish).To(storagemarket.StorageDealPublishing). Action(func(deal *storagemarket.MinerDeal, publishCid cid.Cid) error { - deal.PublishCid = publishCid + deal.PublishCid = &publishCid return nil }), fsm.Event(storagemarket.ProviderEventDealPublishError). diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 5a605147c..0cd213a06 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -24,7 +24,6 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" - "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) var log = logging.Logger("providerstates") @@ -37,8 +36,7 @@ type ProviderDealEnvironment interface { Ask() storagemarket.StorageAsk StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error GeneratePieceCommitmentToFile(payloadCid cid.Cid, selector ipld.Node) (cid.Cid, filestore.Path, filestore.Path, error) - SendSignedResponse(ctx context.Context, response *network.Response) error - Disconnect(proposalCid cid.Cid) error + SendSignedResponse(ctx context.Context, client peer.ID, response *storagemarket.ProposalResponse) error FileStore() filestore.FileStore PieceStore() piecestore.PieceStore DealAcceptanceBuffer() abi.ChainEpoch @@ -179,7 +177,7 @@ func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, d func WaitForFunding(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { node := environment.Node() - return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { + return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err)) } @@ -210,7 +208,7 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor // WaitForPublish waits for the publish message on chain and sends the deal id back to the client func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - return environment.Node().WaitForMessage(ctx.Context(), deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error { + return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err)) } @@ -223,20 +221,16 @@ func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal s return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) } - err = environment.SendSignedResponse(ctx.Context(), &network.Response{ + err = environment.SendSignedResponse(ctx.Context(), deal.Client, &storagemarket.ProposalResponse{ State: storagemarket.StorageDealProposalAccepted, Proposal: deal.ProposalCid, - PublishMessage: &deal.PublishCid, + PublishMessage: deal.PublishCid, }) if err != nil { return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) } - if err := environment.Disconnect(deal.ProposalCid); err != nil { - log.Warnf("closing client connection: %+v", err) - } - return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) }) @@ -351,7 +345,7 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage log.Warnf("deal %s failed: %s", deal.ProposalCid, deal.Message) if !deal.ConnectionClosed { - err := environment.SendSignedResponse(ctx.Context(), &network.Response{ + err := environment.SendSignedResponse(ctx.Context(), deal.Client, &storagemarket.ProposalResponse{ State: storagemarket.StorageDealFailing, Message: deal.Message, Proposal: deal.ProposalCid, @@ -360,10 +354,6 @@ func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storage if err != nil { return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) } - - if err := environment.Disconnect(deal.ProposalCid); err != nil { - log.Warnf("closing client connection: %+v", err) - } } if deal.PiecePath != filestore.Path("") { diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 4f744050c..46fd62f6d 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -28,7 +28,6 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" - "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" ) @@ -305,7 +304,7 @@ func TestEnsureProviderFunds(t *testing.T) { }, dealInspector: func(t *testing.T, deal storagemarket.MinerDeal) { require.Equal(t, storagemarket.StorageDealProviderFunding, deal.State) - require.Equal(t, cids[0], deal.AddFundsCid) + require.Equal(t, &cids[0], deal.AddFundsCid) }, }, "get miner worker fails": { @@ -880,6 +879,8 @@ func makeExecutor(ctx context.Context, dealState, err := tut.MakeTestMinerDeal(initialState, signedProposal, dataRef) require.NoError(t, err) + dealState.AddFundsCid = &tut.GenerateCids(1)[0] + dealState.PublishCid = &tut.GenerateCids(1)[0] if dealParams.PiecePath != filestore.Path("") { dealState.PiecePath = dealParams.PiecePath } @@ -904,7 +905,6 @@ func makeExecutor(ctx context.Context, metadataPath: params.MetadataPath, generateCommPError: params.GenerateCommPError, sendSignedResponseError: params.SendSignedResponseError, - disconnectError: params.DisconnectError, dealAcceptanceBuffer: abi.ChainEpoch(params.DealAcceptanceBuffer), fs: fs, pieceStore: pieceStore, @@ -945,7 +945,6 @@ type fakeEnvironment struct { metadataPath filestore.Path generateCommPError error sendSignedResponseError error - disconnectError error fs filestore.FileStore pieceStore piecestore.PieceStore dealAcceptanceBuffer abi.ChainEpoch @@ -971,14 +970,10 @@ func (fe *fakeEnvironment) GeneratePieceCommitmentToFile(payloadCid cid.Cid, sel return fe.pieceCid, fe.path, fe.metadataPath, fe.generateCommPError } -func (fe *fakeEnvironment) SendSignedResponse(ctx context.Context, response *network.Response) error { +func (fe *fakeEnvironment) SendSignedResponse(ctx context.Context, client peer.ID, response *storagemarket.ProposalResponse) error { return fe.sendSignedResponseError } -func (fe *fakeEnvironment) Disconnect(proposalCid cid.Cid) error { - return fe.disconnectError -} - func (fe *fakeEnvironment) FileStore() filestore.FileStore { return fe.fs } diff --git a/storagemarket/impl/providerutils/providerutils_test.go b/storagemarket/impl/providerutils/providerutils_test.go index 44ac3d349..1f4c06011 100644 --- a/storagemarket/impl/providerutils/providerutils_test.go +++ b/storagemarket/impl/providerutils/providerutils_test.go @@ -28,7 +28,6 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" - "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) func TestVerifyProposal(t *testing.T) { @@ -91,7 +90,7 @@ func TestSignMinerData(t *testing.T) { shouldErr: false, }, "cbor dump errors": { - data: &network.Response{}, + data: &storagemarket.ProposalResponse{}, workerLookup: successLookup, signBytes: successSign, shouldErr: true, diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 5b51c9dff..f6fa0a4f1 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -112,6 +112,7 @@ func TestMakeDeal(t *testing.T) { storagemarket.StorageDealEnsureClientFunds, //storagemarket.StorageDealClientFunding, // skipped because funds available storagemarket.StorageDealFundsEnsured, + storagemarket.StorageDealWaitingForResponse, storagemarket.StorageDealValidating, storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealSealing, @@ -166,7 +167,7 @@ func TestMakeDealOffline(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + assert.Equal(t, storagemarket.StorageDealWaitingForResponse, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) @@ -213,7 +214,7 @@ func TestMakeDealNonBlocking(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, result.ProposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + assert.Equal(t, storagemarket.StorageDealWaitingForResponse, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) diff --git a/storagemarket/network/ask_stream.go b/storagemarket/network/ask_stream.go index 6622e0ad7..e1c51ff1e 100644 --- a/storagemarket/network/ask_stream.go +++ b/storagemarket/network/ask_stream.go @@ -4,6 +4,7 @@ import ( "bufio" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" ) @@ -16,34 +17,34 @@ type askStream struct { var _ StorageAskStream = (*askStream)(nil) -func (as *askStream) ReadAskRequest() (AskRequest, error) { - var a AskRequest +func (as *askStream) ReadAskRequest() (storagemarket.AskRequest, error) { + var a storagemarket.AskRequest if err := a.UnmarshalCBOR(as.buffered); err != nil { log.Warn(err) - return AskRequestUndefined, err + return storagemarket.AskRequestUndefined, err } return a, nil } -func (as *askStream) WriteAskRequest(q AskRequest) error { +func (as *askStream) WriteAskRequest(q storagemarket.AskRequest) error { return cborutil.WriteCborRPC(as.rw, &q) } -func (as *askStream) ReadAskResponse() (AskResponse, error) { - var resp AskResponse +func (as *askStream) ReadAskResponse() (storagemarket.AskResponse, error) { + var resp storagemarket.AskResponse if err := resp.UnmarshalCBOR(as.buffered); err != nil { log.Warn(err) - return AskResponseUndefined, err + return storagemarket.AskResponseUndefined, err } return resp, nil } -func (as *askStream) WriteAskResponse(qr AskResponse) error { +func (as *askStream) WriteAskResponse(qr storagemarket.AskResponse) error { return cborutil.WriteCborRPC(as.rw, &qr) } diff --git a/storagemarket/network/deal_stream.go b/storagemarket/network/deal_stream.go index dec3f8949..4ea03e8ae 100644 --- a/storagemarket/network/deal_stream.go +++ b/storagemarket/network/deal_stream.go @@ -4,6 +4,7 @@ import ( "bufio" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/peer" ) @@ -16,30 +17,30 @@ type dealStream struct { var _ StorageDealStream = (*dealStream)(nil) -func (d *dealStream) ReadDealProposal() (Proposal, error) { - var ds Proposal +func (d *dealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) { + var ds storagemarket.ProposalRequest if err := ds.UnmarshalCBOR(d.buffered); err != nil { log.Warn(err) - return ProposalUndefined, err + return storagemarket.ProposalRequestUndefined, err } return ds, nil } -func (d *dealStream) WriteDealProposal(dp Proposal) error { +func (d *dealStream) WriteDealProposal(dp storagemarket.ProposalRequest) error { return cborutil.WriteCborRPC(d.rw, &dp) } -func (d *dealStream) ReadDealResponse() (SignedResponse, error) { - var dr SignedResponse +func (d *dealStream) ReadDealResponse() (storagemarket.SignedResponse, error) { + var dr storagemarket.SignedResponse if err := dr.UnmarshalCBOR(d.buffered); err != nil { - return SignedResponseUndefined, err + return storagemarket.SignedResponseUndefined, err } return dr, nil } -func (d *dealStream) WriteDealResponse(dr SignedResponse) error { +func (d *dealStream) WriteDealResponse(dr storagemarket.SignedResponse) error { return cborutil.WriteCborRPC(d.rw, &dr) } diff --git a/storagemarket/network/libp2p_impl_test.go b/storagemarket/network/libp2p_impl_test.go index f47f7d53f..5f59f5f82 100644 --- a/storagemarket/network/libp2p_impl_test.go +++ b/storagemarket/network/libp2p_impl_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -45,7 +46,7 @@ func TestAskStreamSendReceiveAskRequest(t *testing.T) { require.NoError(t, fromNetwork.SetDelegate(tr)) // host2 gets receiver - achan := make(chan network.AskRequest) + achan := make(chan storagemarket.AskRequest) tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { readq, err := s.ReadAskRequest() require.NoError(t, err) @@ -69,7 +70,7 @@ func TestAskStreamSendReceiveAskResponse(t *testing.T) { require.NoError(t, fromNetwork.SetDelegate(tr)) // host2 gets receiver - achan := make(chan network.AskResponse) + achan := make(chan storagemarket.AskResponse) tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { a, err := s.ReadAskResponse() require.NoError(t, err) @@ -107,7 +108,7 @@ func TestAskStreamSendReceiveMultipleSuccessful(t *testing.T) { qs, err := nw1.NewAskStream(td.Host2.ID()) require.NoError(t, err) - var resp network.AskResponse + var resp storagemarket.AskResponse go require.NoError(t, qs.WriteAskRequest(shared_testutil.MakeTestStorageAskRequest())) resp, err = qs.ReadAskResponse() require.NoError(t, err) @@ -132,7 +133,7 @@ func TestDealStreamSendReceiveDealProposal(t *testing.T) { tr := &testReceiver{t: t} require.NoError(t, fromNetwork.SetDelegate(tr)) - dchan := make(chan network.Proposal) + dchan := make(chan storagemarket.ProposalRequest) tr2 := &testReceiver{ t: t, dealStreamHandler: func(s network.StorageDealStream) { @@ -156,7 +157,7 @@ func TestDealStreamSendReceiveDealResponse(t *testing.T) { tr := &testReceiver{t: t} require.NoError(t, fromNetwork.SetDelegate(tr)) - drChan := make(chan network.SignedResponse) + drChan := make(chan storagemarket.SignedResponse) tr2 := &testReceiver{ t: t, dealStreamHandler: func(s network.StorageDealStream) { @@ -230,7 +231,7 @@ func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) { require.NoError(t, fromNetwork.SetDelegate(tr)) // host2 gets receiver - achan := make(chan network.AskRequest) + achan := make(chan storagemarket.AskRequest) tr2 := &testReceiver{t: t, askStreamHandler: func(s network.StorageAskStream) { readar, err := s.ReadAskRequest() require.NoError(t, err) @@ -245,7 +246,7 @@ func TestLibp2pStorageMarketNetwork_StopHandlingRequests(t *testing.T) { } // assertDealProposalReceived performs the verification that a deal proposal is received -func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.Proposal) { +func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan storagemarket.ProposalRequest) { ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() @@ -256,7 +257,7 @@ func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork dp := shared_testutil.MakeTestStorageNetworkProposal() require.NoError(t, qs1.WriteDealProposal(dp)) - var dealReceived network.Proposal + var dealReceived storagemarket.ProposalRequest select { case <-ctx.Done(): t.Error("deal proposal not received") @@ -266,7 +267,7 @@ func assertDealProposalReceived(inCtx context.Context, t *testing.T, fromNetwork assert.Equal(t, dp, dealReceived) } -func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan network.SignedResponse) { +func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toPeer peer.ID, inChan chan storagemarket.SignedResponse) { ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second) defer cancel() @@ -276,7 +277,7 @@ func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNet dr := shared_testutil.MakeTestStorageNetworkSignedResponse() require.NoError(t, ds1.WriteDealResponse(dr)) - var responseReceived network.SignedResponse + var responseReceived storagemarket.SignedResponse select { case <-ctx.Done(): t.Error("response not received") @@ -286,8 +287,8 @@ func assertDealResponseReceived(parentCtx context.Context, t *testing.T, fromNet assert.Equal(t, dr, responseReceived) } -// assertAskRequestReceived performs the verification that a AskRequest is received -func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, achan chan network.AskRequest) { +// assertAskRequestReceived performs the verification that a storagemarket.AskRequest is received +func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, achan chan storagemarket.AskRequest) { ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() @@ -298,7 +299,7 @@ func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork n a := shared_testutil.MakeTestStorageAskRequest() require.NoError(t, as1.WriteAskRequest(a)) - var ina network.AskRequest + var ina storagemarket.AskRequest select { case <-ctx.Done(): t.Error("msg not received") @@ -308,11 +309,11 @@ func assertAskRequestReceived(inCtx context.Context, t *testing.T, fromNetwork n assert.Equal(t, a.Miner, ina.Miner) } -// assertAskResponseReceived performs the verification that a AskResponse is received +// assertAskResponseReceived performs the verification that a storagemarket.AskResponse is received func assertAskResponseReceived(inCtx context.Context, t *testing.T, fromNetwork network.StorageMarketNetwork, toHost peer.ID, - achan chan network.AskResponse) { + achan chan storagemarket.AskResponse) { ctx, cancel := context.WithTimeout(inCtx, 10*time.Second) defer cancel() @@ -325,7 +326,7 @@ func assertAskResponseReceived(inCtx context.Context, t *testing.T, require.NoError(t, as1.WriteAskResponse(ar)) // read queryresponse - var inar network.AskResponse + var inar storagemarket.AskResponse select { case <-ctx.Done(): t.Error("msg not received") diff --git a/storagemarket/network/network.go b/storagemarket/network/network.go index 03c4b8d78..5affe2e4e 100644 --- a/storagemarket/network/network.go +++ b/storagemarket/network/network.go @@ -1,26 +1,27 @@ package network import ( + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/libp2p/go-libp2p-core/peer" ) // StorageAskStream is a stream for reading/writing requests & // responses on the Storage Ask protocol type StorageAskStream interface { - ReadAskRequest() (AskRequest, error) - WriteAskRequest(AskRequest) error - ReadAskResponse() (AskResponse, error) - WriteAskResponse(AskResponse) error + ReadAskRequest() (storagemarket.AskRequest, error) + WriteAskRequest(storagemarket.AskRequest) error + ReadAskResponse() (storagemarket.AskResponse, error) + WriteAskResponse(storagemarket.AskResponse) error Close() error } // StorageDealStream is a stream for reading and writing requests // and responses on the storage deal protocol type StorageDealStream interface { - ReadDealProposal() (Proposal, error) - WriteDealProposal(Proposal) error - ReadDealResponse() (SignedResponse, error) - WriteDealResponse(SignedResponse) error + ReadDealProposal() (storagemarket.ProposalRequest, error) + WriteDealProposal(storagemarket.ProposalRequest) error + ReadDealResponse() (storagemarket.SignedResponse, error) + WriteDealResponse(storagemarket.SignedResponse) error RemotePeer() peer.ID Close() error } diff --git a/storagemarket/network/types.go b/storagemarket/network/types.go deleted file mode 100644 index 7d0294e5a..000000000 --- a/storagemarket/network/types.go +++ /dev/null @@ -1,58 +0,0 @@ -package network - -import ( - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/go-fil-markets/storagemarket" -) - -//go:generate cbor-gen-for AskRequest AskResponse Proposal Response SignedResponse - -// Proposal is the data sent over the network from client to provider when proposing -// a deal -type Proposal struct { - DealProposal *market.ClientDealProposal - - Piece *storagemarket.DataRef -} - -var ProposalUndefined = Proposal{} - -// Response is a response to a proposal sent over the network -type Response struct { - State storagemarket.StorageDealStatus - - // DealProposalRejected - Message string - Proposal cid.Cid - - // StorageDealProposalAccepted - PublishMessage *cid.Cid -} - -// SignedResponse is a response that is signed -type SignedResponse struct { - Response Response - - Signature *crypto.Signature -} - -var SignedResponseUndefined = SignedResponse{} - -// AskRequest is a request for current ask parameters for a given miner -type AskRequest struct { - Miner address.Address -} - -var AskRequestUndefined = AskRequest{} - -// AskResponse is the response sent over the network in response -// to an ask request -type AskResponse struct { - Ask *storagemarket.SignedStorageAsk -} - -var AskResponseUndefined = AskResponse{} diff --git a/storagemarket/network/types_cbor_gen.go b/storagemarket/network/types_cbor_gen.go deleted file mode 100644 index 73b42f5a1..000000000 --- a/storagemarket/network/types_cbor_gen.go +++ /dev/null @@ -1,390 +0,0 @@ -// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. - -package network - -import ( - "fmt" - "io" - - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/filecoin-project/specs-actors/actors/crypto" - cbg "github.com/whyrusleeping/cbor-gen" - xerrors "golang.org/x/xerrors" -) - -var _ = xerrors.Errorf - -func (t *AskRequest) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Miner (address.Address) (struct) - if err := t.Miner.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Miner (address.Address) (struct) - - { - - if err := t.Miner.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Miner: %w", err) - } - - } - return nil -} - -func (t *AskResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{129}); err != nil { - return err - } - - // t.Ask (storagemarket.SignedStorageAsk) (struct) - if err := t.Ask.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 1 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Ask (storagemarket.SignedStorageAsk) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Ask = new(storagemarket.SignedStorageAsk) - if err := t.Ask.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Ask pointer: %w", err) - } - } - - } - return nil -} - -func (t *Proposal) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.DealProposal (market.ClientDealProposal) (struct) - if err := t.DealProposal.MarshalCBOR(w); err != nil { - return err - } - - // t.Piece (storagemarket.DataRef) (struct) - if err := t.Piece.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *Proposal) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.DealProposal (market.ClientDealProposal) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.DealProposal = new(market.ClientDealProposal) - if err := t.DealProposal.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) - } - } - - } - // t.Piece (storagemarket.DataRef) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Piece = new(storagemarket.DataRef) - if err := t.Piece.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Piece pointer: %w", err) - } - } - - } - return nil -} - -func (t *Response) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{132}); err != nil { - return err - } - - // t.State (uint64) (uint64) - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { - return err - } - - // t.Message (string) (string) - if len(t.Message) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Message was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Message)); err != nil { - return err - } - - // t.Proposal (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.Proposal); err != nil { - return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) - } - - // t.PublishMessage (cid.Cid) (struct) - - if t.PublishMessage == nil { - if _, err := w.Write(cbg.CborNull); err != nil { - return err - } - } else { - if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { - return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) - } - } - - return nil -} - -func (t *Response) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 4 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.State (uint64) (uint64) - - { - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) - - } - // t.Message (string) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Message = string(sval) - } - // t.Proposal (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) - } - - t.Proposal = c - - } - // t.PublishMessage (cid.Cid) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) - } - - t.PublishMessage = &c - } - - } - return nil -} - -func (t *SignedResponse) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - if _, err := w.Write([]byte{130}); err != nil { - return err - } - - // t.Response (network.Response) (struct) - if err := t.Response.MarshalCBOR(w); err != nil { - return err - } - - // t.Signature (crypto.Signature) (struct) - if err := t.Signature.MarshalCBOR(w); err != nil { - return err - } - return nil -} - -func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { - br := cbg.GetPeeker(r) - - maj, extra, err := cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajArray { - return fmt.Errorf("cbor input should be of type array") - } - - if extra != 2 { - return fmt.Errorf("cbor input had wrong number of fields") - } - - // t.Response (network.Response) (struct) - - { - - if err := t.Response.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Response: %w", err) - } - - } - // t.Signature (crypto.Signature) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.Signature = new(crypto.Signature) - if err := t.Signature.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Signature pointer: %w", err) - } - } - - } - return nil -} diff --git a/storagemarket/types.go b/storagemarket/types.go index 642e9ff40..458e87521 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -17,7 +17,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) -//go:generate cbor-gen-for ClientDeal MinerDeal Balance SignedStorageAsk StorageAsk StorageDeal DataRef +//go:generate cbor-gen-for ClientDeal MinerDeal Balance SignedStorageAsk StorageAsk StorageDeal DataRef AskRequest AskResponse ProposalRequest ProposalResponse SignedResponse const DealProtocolID = "/fil/storage/mk/1.0.1" const AskProtocolID = "/fil/storage/ask/1.0.1" @@ -46,6 +46,7 @@ const ( StorageDealValidating // Verifying that deal parameters are good StorageDealTransferring // Moving data StorageDealWaitingForData // Manual transfer + StorageDealWaitingForResponse // Waiting for a response from the provider StorageDealVerifyData // Verify transferred data - generate CAR / piece data StorageDealEnsureProviderFunds // Ensuring that provider collateral is sufficient StorageDealEnsureClientFunds // Ensuring that client funds are sufficient @@ -72,6 +73,7 @@ var DealStates = map[StorageDealStatus]string{ StorageDealValidating: "StorageDealValidating", StorageDealTransferring: "StorageDealTransferring", StorageDealWaitingForData: "StorageDealWaitingForData", + StorageDealWaitingForResponse: "StorageDealWaitingForResponse", StorageDealVerifyData: "StorageDealVerifyData", StorageDealEnsureProviderFunds: "StorageDealEnsureProviderFunds", StorageDealEnsureClientFunds: "StorageDealEnsureClientFunds", @@ -127,8 +129,8 @@ var StorageAskUndefined = StorageAsk{} type MinerDeal struct { market.ClientDealProposal ProposalCid cid.Cid - AddFundsCid cid.Cid - PublishCid cid.Cid + AddFundsCid *cid.Cid + PublishCid *cid.Cid Miner peer.ID Client peer.ID State StorageDealStatus @@ -267,7 +269,7 @@ var ProviderEvents = map[ProviderEvent]string{ type ClientDeal struct { market.ClientDealProposal ProposalCid cid.Cid - AddFundsCid cid.Cid + AddFundsCid *cid.Cid State StorageDealStatus Miner peer.ID MinerWorker address.Address @@ -275,6 +277,7 @@ type ClientDeal struct { DataRef *DataRef Message string PublishMessage *cid.Cid + LastResponse *SignedResponse } type ClientEvent uint64 @@ -298,8 +301,8 @@ const ( // ClientEventDealProposed happens when a new proposal is sent to a provider ClientEventDealProposed - // ClientEventDealStreamLookupErrored the deal stream for a deal could not be found - ClientEventDealStreamLookupErrored + // ClientEventReceiveResponse happens when a new deal response is received + ClientEventReceiveResponse // ClientEventReadResponseFailed means a network error occurred reading a deal response ClientEventReadResponseFailed @@ -343,8 +346,7 @@ var ClientEvents = map[ClientEvent]string{ ClientEventFundsEnsured: "ClientEventFundsEnsured", ClientEventWriteProposalFailed: "ClientEventWriteProposalFailed", ClientEventDealProposed: "ClientEventDealProposed", - ClientEventDealStreamLookupErrored: "ClientEventDealStreamLookupErrored", - ClientEventReadResponseFailed: "ClientEventReadResponseFailed", + ClientEventReceiveResponse: "ClientEventReceiveResponse", ClientEventResponseVerificationFailed: "ClientEventResponseVerificationFailed", ClientEventResponseDealDidNotMatch: "ClientEventResponseDealDidNotMatch", ClientEventStreamCloseError: "ClientEventStreamCloseError", @@ -543,3 +545,49 @@ type StorageClient interface { SubscribeToEvents(subscriber ClientSubscriber) shared.Unsubscribe } + +// ProposalRequest is the data sent over the network from client to provider when proposing +// a deal +type ProposalRequest struct { + DealProposal *market.ClientDealProposal + + Piece *DataRef +} + +var ProposalRequestUndefined = ProposalRequest{} + +// ProposalResponse is a response to a proposal sent over the network +type ProposalResponse struct { + State StorageDealStatus + + // DealProposalRejected + Message string + Proposal cid.Cid + + // StorageDealProposalAccepted + PublishMessage *cid.Cid +} + +// SignedResponse is a response that is signed +type SignedResponse struct { + Response ProposalResponse + + Signature *crypto.Signature +} + +var SignedResponseUndefined = SignedResponse{} + +// AskRequest is a request for current ask parameters for a given miner +type AskRequest struct { + Miner address.Address +} + +var AskRequestUndefined = AskRequest{} + +// AskResponse is the response sent over the network in response +// to an ask request +type AskResponse struct { + Ask *SignedStorageAsk +} + +var AskResponseUndefined = AskResponse{} diff --git a/storagemarket/types_cbor_gen.go b/storagemarket/types_cbor_gen.go index 0d101bd6d..3421559f0 100644 --- a/storagemarket/types_cbor_gen.go +++ b/storagemarket/types_cbor_gen.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" @@ -21,7 +22,7 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{137}); err != nil { + if _, err := w.Write([]byte{139}); err != nil { return err } @@ -36,6 +37,18 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) } + // t.AddFundsCid (cid.Cid) (struct) + + if t.AddFundsCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.AddFundsCid); err != nil { + return xerrors.Errorf("failed to write cid field t.AddFundsCid: %w", err) + } + } + // t.State (uint64) (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { @@ -94,6 +107,10 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { } } + // t.LastResponse (storagemarket.SignedResponse) (struct) + if err := t.LastResponse.MarshalCBOR(w); err != nil { + return err + } return nil } @@ -108,7 +125,7 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 9 { + if extra != 11 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -132,6 +149,30 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { t.ProposalCid = c + } + // t.AddFundsCid (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.AddFundsCid: %w", err) + } + + t.AddFundsCid = &c + } + } // t.State (uint64) (uint64) @@ -234,6 +275,27 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { t.PublishMessage = &c } + } + // t.LastResponse (storagemarket.SignedResponse) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.LastResponse = new(SignedResponse) + if err := t.LastResponse.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.LastResponse pointer: %w", err) + } + } + } return nil } @@ -243,7 +305,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{139}); err != nil { + if _, err := w.Write([]byte{141}); err != nil { return err } @@ -258,6 +320,30 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) } + // t.AddFundsCid (cid.Cid) (struct) + + if t.AddFundsCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.AddFundsCid); err != nil { + return xerrors.Errorf("failed to write cid field t.AddFundsCid: %w", err) + } + } + + // t.PublishCid (cid.Cid) (struct) + + if t.PublishCid == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishCid: %w", err) + } + } + // t.Miner (peer.ID) (string) if len(t.Miner) > cbg.MaxLength { return xerrors.Errorf("Value in field t.Miner was too long") @@ -354,7 +440,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 11 { + if extra != 13 { return fmt.Errorf("cbor input had wrong number of fields") } @@ -378,6 +464,54 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { t.ProposalCid = c + } + // t.AddFundsCid (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.AddFundsCid: %w", err) + } + + t.AddFundsCid = &c + } + + } + // t.PublishCid (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishCid: %w", err) + } + + t.PublishCid = &c + } + } // t.Miner (peer.ID) (string) @@ -1008,3 +1142,377 @@ func (t *DataRef) UnmarshalCBOR(r io.Reader) error { } return nil } + +func (t *AskRequest) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Miner (address.Address) (struct) + if err := t.Miner.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskRequest) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Miner (address.Address) (struct) + + { + + if err := t.Miner.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Miner: %w", err) + } + + } + return nil +} + +func (t *AskResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.Ask (storagemarket.SignedStorageAsk) (struct) + if err := t.Ask.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *AskResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Ask (storagemarket.SignedStorageAsk) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Ask = new(SignedStorageAsk) + if err := t.Ask.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Ask pointer: %w", err) + } + } + + } + return nil +} + +func (t *ProposalRequest) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.DealProposal (market.ClientDealProposal) (struct) + if err := t.DealProposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Piece (storagemarket.DataRef) (struct) + if err := t.Piece.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *ProposalRequest) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.DealProposal (market.ClientDealProposal) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.DealProposal = new(market.ClientDealProposal) + if err := t.DealProposal.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.DealProposal pointer: %w", err) + } + } + + } + // t.Piece (storagemarket.DataRef) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Piece = new(DataRef) + if err := t.Piece.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Piece pointer: %w", err) + } + } + + } + return nil +} + +func (t *ProposalResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{132}); err != nil { + return err + } + + // t.State (uint64) (uint64) + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Message (string) (string) + if len(t.Message) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Message was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Message)); err != nil { + return err + } + + // t.Proposal (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Proposal); err != nil { + return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) + } + + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } + } + + return nil +} + +func (t *ProposalResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.State (uint64) (uint64) + + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + + } + // t.Message (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Message = string(sval) + } + // t.Proposal (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Proposal: %w", err) + } + + t.Proposal = c + + } + // t.PublishMessage (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } + + } + return nil +} + +func (t *SignedResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{130}); err != nil { + return err + } + + // t.Response (storagemarket.ProposalResponse) (struct) + if err := t.Response.MarshalCBOR(w); err != nil { + return err + } + + // t.Signature (crypto.Signature) (struct) + if err := t.Signature.MarshalCBOR(w); err != nil { + return err + } + return nil +} + +func (t *SignedResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 2 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Response (storagemarket.ProposalResponse) (struct) + + { + + if err := t.Response.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Response: %w", err) + } + + } + // t.Signature (crypto.Signature) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + t.Signature = new(crypto.Signature) + if err := t.Signature.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Signature pointer: %w", err) + } + } + + } + return nil +}