Skip to content

Commit

Permalink
feat(storagemarket): move to closing streams
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed May 8, 2020
1 parent 32d301b commit 8b0fbca
Show file tree
Hide file tree
Showing 23 changed files with 768 additions and 748 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-libp2p v0.6.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/multiformats/go-multihash v0.0.13
github.com/stretchr/testify v1.5.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200414195334-429a0b5e922e
Expand Down
52 changes: 27 additions & 25 deletions shared_testutil/test_network_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand Down Expand Up @@ -505,16 +506,16 @@ func StubbedDealPaymentReader(payment rm.DealPayment) DealPaymentReader {
}

// StorageDealProposalReader is a function to mock reading deal proposals.
type StorageDealProposalReader func() (smnet.Proposal, error)
type StorageDealProposalReader func() (storagemarket.ProposalRequest, error)

// StorageDealResponseReader is a function to mock reading deal responses.
type StorageDealResponseReader func() (smnet.SignedResponse, error)
type StorageDealResponseReader func() (storagemarket.SignedResponse, error)

// StorageDealResponseWriter is a function to mock writing deal responses.
type StorageDealResponseWriter func(smnet.SignedResponse) error
type StorageDealResponseWriter func(storagemarket.SignedResponse) error

// StorageDealProposalWriter is a function to mock writing deal proposals.
type StorageDealProposalWriter func(smnet.Proposal) error
type StorageDealProposalWriter func(storagemarket.ProposalRequest) error

// TestStorageDealStream is a retrieval deal stream with predefined
// stubbed behavior.
Expand Down Expand Up @@ -562,22 +563,22 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) smnet.StorageD
}

// ReadDealProposal calls the mocked deal proposal reader function.
func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, error) {
func (tsds *TestStorageDealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) {
return tsds.proposalReader()
}

// WriteDealProposal calls the mocked deal proposal writer function.
func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal smnet.Proposal) error {
func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal storagemarket.ProposalRequest) error {
return tsds.proposalWriter(dealProposal)
}

// ReadDealResponse calls the mocked deal response reader function.
func (tsds *TestStorageDealStream) ReadDealResponse() (smnet.SignedResponse, error) {
func (tsds *TestStorageDealStream) ReadDealResponse() (storagemarket.SignedResponse, error) {
return tsds.responseReader()
}

// WriteDealResponse calls the mocked deal response writer function.
func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse smnet.SignedResponse) error {
func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse storagemarket.SignedResponse) error {
return tsds.responseWriter(dealResponse)
}

Expand All @@ -588,57 +589,57 @@ func (tsds TestStorageDealStream) RemotePeer() peer.ID { return tsds.p }
func (tsds TestStorageDealStream) Close() error { return nil }

// TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal.
func TrivialStorageDealProposalReader() (smnet.Proposal, error) {
return smnet.Proposal{}, nil
func TrivialStorageDealProposalReader() (storagemarket.ProposalRequest, error) {
return storagemarket.ProposalRequest{}, nil
}

// TrivialStorageDealResponseReader succeeds trivially, returning an empty deal response.
func TrivialStorageDealResponseReader() (smnet.SignedResponse, error) {
return smnet.SignedResponse{}, nil
func TrivialStorageDealResponseReader() (storagemarket.SignedResponse, error) {
return storagemarket.SignedResponse{}, nil
}

// TrivialStorageDealProposalWriter succeeds trivially, returning no error.
func TrivialStorageDealProposalWriter(smnet.Proposal) error {
func TrivialStorageDealProposalWriter(storagemarket.ProposalRequest) error {
return nil
}

// TrivialStorageDealResponseWriter succeeds trivially, returning no error.
func TrivialStorageDealResponseWriter(smnet.SignedResponse) error {
func TrivialStorageDealResponseWriter(storagemarket.SignedResponse) error {
return nil
}

// StubbedStorageProposalReader returns the given proposal when called
func StubbedStorageProposalReader(proposal smnet.Proposal) StorageDealProposalReader {
return func() (smnet.Proposal, error) {
func StubbedStorageProposalReader(proposal storagemarket.ProposalRequest) StorageDealProposalReader {
return func() (storagemarket.ProposalRequest, error) {
return proposal, nil
}
}

// StubbedStorageResponseReader returns the given deal response when called
func StubbedStorageResponseReader(response smnet.SignedResponse) StorageDealResponseReader {
return func() (smnet.SignedResponse, error) {
func StubbedStorageResponseReader(response storagemarket.SignedResponse) StorageDealResponseReader {
return func() (storagemarket.SignedResponse, error) {
return response, nil
}
}

// FailStorageProposalWriter always fails
func FailStorageProposalWriter(smnet.Proposal) error {
func FailStorageProposalWriter(storagemarket.ProposalRequest) error {
return errors.New("write proposal failed")
}

// FailStorageProposalReader always fails
func FailStorageProposalReader() (smnet.Proposal, error) {
return smnet.ProposalUndefined, errors.New("read proposal failed")
func FailStorageProposalReader() (storagemarket.ProposalRequest, error) {
return storagemarket.ProposalRequestUndefined, errors.New("read proposal failed")
}

// FailStorageResponseWriter always fails
func FailStorageResponseWriter(smnet.SignedResponse) error {
func FailStorageResponseWriter(storagemarket.SignedResponse) error {
return errors.New("write proposal failed")
}

// FailStorageResponseReader always fails
func FailStorageResponseReader() (smnet.SignedResponse, error) {
return smnet.SignedResponseUndefined, errors.New("read response failed")
func FailStorageResponseReader() (storagemarket.SignedResponse, error) {
return storagemarket.SignedResponseUndefined, errors.New("read response failed")
}

// TestPeerResolver provides a fake retrievalmarket PeerResolver
Expand All @@ -650,4 +651,5 @@ type TestPeerResolver struct {
func (tpr TestPeerResolver) GetPeers(cid.Cid) ([]rm.RetrievalPeer, error) {
return tpr.Peers, tpr.ResolverError
}
var _ rm.PeerResolver = &TestPeerResolver{}

var _ rm.PeerResolver = &TestPeerResolver{}
21 changes: 10 additions & 11 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

// MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields
Expand Down Expand Up @@ -213,17 +212,17 @@ func MakeTestSignedStorageAsk() *storagemarket.SignedStorageAsk {

// MakeTestStorageNetworkProposal generates a proposal that can be sent over the
// network to a provider
func MakeTestStorageNetworkProposal() smnet.Proposal {
return smnet.Proposal{
func MakeTestStorageNetworkProposal() storagemarket.ProposalRequest {
return storagemarket.ProposalRequest{
DealProposal: MakeTestClientDealProposal(),
Piece: &storagemarket.DataRef{Root: GenerateCids(1)[0]},
}
}

// MakeTestStorageNetworkResponse generates a response to a proposal sent over
// the network
func MakeTestStorageNetworkResponse() smnet.Response {
return smnet.Response{
func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse {
return storagemarket.ProposalResponse{
State: storagemarket.StorageDealSealing,
Proposal: GenerateCids(1)[0],
PublishMessage: &(GenerateCids(1)[0]),
Expand All @@ -232,23 +231,23 @@ func MakeTestStorageNetworkResponse() smnet.Response {

// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over
// the network that is signed
func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse {
return smnet.SignedResponse{
func MakeTestStorageNetworkSignedResponse() storagemarket.SignedResponse {
return storagemarket.SignedResponse{
Response: MakeTestStorageNetworkResponse(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageAskRequest generates a request to get a provider's ask
func MakeTestStorageAskRequest() smnet.AskRequest {
return smnet.AskRequest{
func MakeTestStorageAskRequest() storagemarket.AskRequest {
return storagemarket.AskRequest{
Miner: address.TestAddress2,
}
}

// MakeTestStorageAskResponse generates a response to an ask request
func MakeTestStorageAskResponse() smnet.AskResponse {
return smnet.AskResponse{
func MakeTestStorageAskResponse() storagemarket.AskResponse {
return storagemarket.AskResponse{
Ask: MakeTestSignedStorageAsk(),
}
}
Expand Down
49 changes: 33 additions & 16 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/pieceio"
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewClient(
}

func (c *Client) Run(ctx context.Context) {
c.net.SetDelegate(c)
}

func (c *Client) Stop() {
Expand Down Expand Up @@ -156,7 +158,7 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

request := network.AskRequest{Miner: info.Address}
request := storagemarket.AskRequest{Miner: info.Address}
if err := s.WriteAskRequest(request); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}
Expand Down Expand Up @@ -247,15 +249,6 @@ func (c *Client) ProposeStorageDeal(
return nil, xerrors.Errorf("setting up deal tracking: %w", err)
}

s, err := c.net.NewDealStream(info.PeerID)
if err != nil {
return nil, xerrors.Errorf("connecting to storage provider failed: %w", err)
}
err = c.conns.AddStream(deal.ProposalCid, s)
if err != nil {
return nil, err
}

err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen)
if err != nil {
return nil, xerrors.Errorf("initializing state machine: %w", err)
Expand Down Expand Up @@ -308,6 +301,25 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh
return shared.Unsubscribe(c.pubSub.Subscribe(subscriber))
}

func (c *Client) HandleAskStream(s network.StorageAskStream) {
s.Close()
}

func (c *Client) HandleDealStream(s network.StorageDealStream) {
defer s.Close()
log.Info("Handling storage deal proposal!")

response, err := s.ReadDealResponse()
if err != nil {
log.Errorf("%+v", err)
return
}
err = c.statemachines.Send(response.Response.Proposal, storagemarket.ClientEventReceiveResponse, response)
if err != nil {
log.Errorf("%+v", err)
}
}

func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ClientEvent)
if !ok {
Expand Down Expand Up @@ -355,10 +367,15 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
return c.c.node
}

func (c *clientDealEnvironment) DealStream(proposalCid cid.Cid) (network.StorageDealStream, error) {
return c.c.conns.DealStream(proposalCid)
}

func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error {
return c.c.conns.Disconnect(proposalCid)
func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error {
s, err := c.c.net.NewDealStream(p)
if err != nil {
return err
}
err = s.WriteDealProposal(proposal)
closeErr := s.Close()
if closeErr != nil {
log.Warnf("error closing stream: %w", closeErr)
}
return err
}
18 changes: 6 additions & 12 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var ClientEvents = fsm.Events{
fsm.Event(storagemarket.ClientEventFundingInitiated).
From(storagemarket.StorageDealEnsureClientFunds).To(storagemarket.StorageDealClientFunding).
Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error {
deal.AddFundsCid = mcid
deal.AddFundsCid = &mcid
return nil
}),
fsm.Event(storagemarket.ClientEventEnsureFundsFailed).
Expand All @@ -34,17 +34,11 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ClientEventDealProposed).
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating),
fsm.Event(storagemarket.ClientEventDealStreamLookupErrored).
FromAny().To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("miner connection error: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForResponse),
fsm.Event(storagemarket.ClientEventReceiveResponse).
From(storagemarket.StorageDealWaitingForResponse).To(storagemarket.StorageDealValidating).
Action(func(deal *storagemarket.ClientDeal, response storagemarket.SignedResponse) error {
deal.LastResponse = &response
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
Expand Down
Loading

0 comments on commit 8b0fbca

Please sign in to comment.