Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't Keep Streams Open #230

Merged
merged 3 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions shared_testutil/test_network_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand Down Expand Up @@ -505,16 +506,16 @@ func StubbedDealPaymentReader(payment rm.DealPayment) DealPaymentReader {
}

// StorageDealProposalReader is a function to mock reading deal proposals.
type StorageDealProposalReader func() (smnet.Proposal, error)
type StorageDealProposalReader func() (storagemarket.ProposalRequest, error)

// StorageDealResponseReader is a function to mock reading deal responses.
type StorageDealResponseReader func() (smnet.SignedResponse, error)
type StorageDealResponseReader func() (storagemarket.SignedResponse, error)

// StorageDealResponseWriter is a function to mock writing deal responses.
type StorageDealResponseWriter func(smnet.SignedResponse) error
type StorageDealResponseWriter func(storagemarket.SignedResponse) error

// StorageDealProposalWriter is a function to mock writing deal proposals.
type StorageDealProposalWriter func(smnet.Proposal) error
type StorageDealProposalWriter func(storagemarket.ProposalRequest) error

// TestStorageDealStream is a retrieval deal stream with predefined
// stubbed behavior.
Expand Down Expand Up @@ -562,22 +563,22 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) smnet.StorageD
}

// ReadDealProposal calls the mocked deal proposal reader function.
func (tsds *TestStorageDealStream) ReadDealProposal() (smnet.Proposal, error) {
func (tsds *TestStorageDealStream) ReadDealProposal() (storagemarket.ProposalRequest, error) {
return tsds.proposalReader()
}

// WriteDealProposal calls the mocked deal proposal writer function.
func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal smnet.Proposal) error {
func (tsds *TestStorageDealStream) WriteDealProposal(dealProposal storagemarket.ProposalRequest) error {
return tsds.proposalWriter(dealProposal)
}

// ReadDealResponse calls the mocked deal response reader function.
func (tsds *TestStorageDealStream) ReadDealResponse() (smnet.SignedResponse, error) {
func (tsds *TestStorageDealStream) ReadDealResponse() (storagemarket.SignedResponse, error) {
return tsds.responseReader()
}

// WriteDealResponse calls the mocked deal response writer function.
func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse smnet.SignedResponse) error {
func (tsds *TestStorageDealStream) WriteDealResponse(dealResponse storagemarket.SignedResponse) error {
return tsds.responseWriter(dealResponse)
}

Expand All @@ -588,57 +589,57 @@ func (tsds TestStorageDealStream) RemotePeer() peer.ID { return tsds.p }
func (tsds TestStorageDealStream) Close() error { return nil }

// TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal.
func TrivialStorageDealProposalReader() (smnet.Proposal, error) {
return smnet.Proposal{}, nil
func TrivialStorageDealProposalReader() (storagemarket.ProposalRequest, error) {
return storagemarket.ProposalRequest{}, nil
}

// TrivialStorageDealResponseReader succeeds trivially, returning an empty deal response.
func TrivialStorageDealResponseReader() (smnet.SignedResponse, error) {
return smnet.SignedResponse{}, nil
func TrivialStorageDealResponseReader() (storagemarket.SignedResponse, error) {
return storagemarket.SignedResponse{}, nil
}

// TrivialStorageDealProposalWriter succeeds trivially, returning no error.
func TrivialStorageDealProposalWriter(smnet.Proposal) error {
func TrivialStorageDealProposalWriter(storagemarket.ProposalRequest) error {
return nil
}

// TrivialStorageDealResponseWriter succeeds trivially, returning no error.
func TrivialStorageDealResponseWriter(smnet.SignedResponse) error {
func TrivialStorageDealResponseWriter(storagemarket.SignedResponse) error {
return nil
}

// StubbedStorageProposalReader returns the given proposal when called
func StubbedStorageProposalReader(proposal smnet.Proposal) StorageDealProposalReader {
return func() (smnet.Proposal, error) {
func StubbedStorageProposalReader(proposal storagemarket.ProposalRequest) StorageDealProposalReader {
return func() (storagemarket.ProposalRequest, error) {
return proposal, nil
}
}

// StubbedStorageResponseReader returns the given deal response when called
func StubbedStorageResponseReader(response smnet.SignedResponse) StorageDealResponseReader {
return func() (smnet.SignedResponse, error) {
func StubbedStorageResponseReader(response storagemarket.SignedResponse) StorageDealResponseReader {
return func() (storagemarket.SignedResponse, error) {
return response, nil
}
}

// FailStorageProposalWriter always fails
func FailStorageProposalWriter(smnet.Proposal) error {
func FailStorageProposalWriter(storagemarket.ProposalRequest) error {
return errors.New("write proposal failed")
}

// FailStorageProposalReader always fails
func FailStorageProposalReader() (smnet.Proposal, error) {
return smnet.ProposalUndefined, errors.New("read proposal failed")
func FailStorageProposalReader() (storagemarket.ProposalRequest, error) {
return storagemarket.ProposalRequestUndefined, errors.New("read proposal failed")
}

// FailStorageResponseWriter always fails
func FailStorageResponseWriter(smnet.SignedResponse) error {
func FailStorageResponseWriter(storagemarket.SignedResponse) error {
return errors.New("write proposal failed")
}

// FailStorageResponseReader always fails
func FailStorageResponseReader() (smnet.SignedResponse, error) {
return smnet.SignedResponseUndefined, errors.New("read response failed")
func FailStorageResponseReader() (storagemarket.SignedResponse, error) {
return storagemarket.SignedResponseUndefined, errors.New("read response failed")
}

// TestPeerResolver provides a fake retrievalmarket PeerResolver
Expand All @@ -650,4 +651,5 @@ type TestPeerResolver struct {
func (tpr TestPeerResolver) GetPeers(cid.Cid) ([]rm.RetrievalPeer, error) {
return tpr.Peers, tpr.ResolverError
}
var _ rm.PeerResolver = &TestPeerResolver{}

var _ rm.PeerResolver = &TestPeerResolver{}
21 changes: 10 additions & 11 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

// MakeTestSignedVoucher generates a random SignedVoucher that has all non-zero fields
Expand Down Expand Up @@ -213,17 +212,17 @@ func MakeTestSignedStorageAsk() *storagemarket.SignedStorageAsk {

// MakeTestStorageNetworkProposal generates a proposal that can be sent over the
// network to a provider
func MakeTestStorageNetworkProposal() smnet.Proposal {
return smnet.Proposal{
func MakeTestStorageNetworkProposal() storagemarket.ProposalRequest {
return storagemarket.ProposalRequest{
DealProposal: MakeTestClientDealProposal(),
Piece: &storagemarket.DataRef{Root: GenerateCids(1)[0]},
}
}

// MakeTestStorageNetworkResponse generates a response to a proposal sent over
// the network
func MakeTestStorageNetworkResponse() smnet.Response {
return smnet.Response{
func MakeTestStorageNetworkResponse() storagemarket.ProposalResponse {
return storagemarket.ProposalResponse{
State: storagemarket.StorageDealSealing,
Proposal: GenerateCids(1)[0],
PublishMessage: &(GenerateCids(1)[0]),
Expand All @@ -232,23 +231,23 @@ func MakeTestStorageNetworkResponse() smnet.Response {

// MakeTestStorageNetworkSignedResponse generates a response to a proposal sent over
// the network that is signed
func MakeTestStorageNetworkSignedResponse() smnet.SignedResponse {
return smnet.SignedResponse{
func MakeTestStorageNetworkSignedResponse() storagemarket.SignedResponse {
return storagemarket.SignedResponse{
Response: MakeTestStorageNetworkResponse(),
Signature: MakeTestSignature(),
}
}

// MakeTestStorageAskRequest generates a request to get a provider's ask
func MakeTestStorageAskRequest() smnet.AskRequest {
return smnet.AskRequest{
func MakeTestStorageAskRequest() storagemarket.AskRequest {
return storagemarket.AskRequest{
Miner: address.TestAddress2,
}
}

// MakeTestStorageAskResponse generates a response to an ask request
func MakeTestStorageAskResponse() smnet.AskResponse {
return smnet.AskResponse{
func MakeTestStorageAskResponse() storagemarket.AskResponse {
return storagemarket.AskResponse{
Ask: MakeTestSignedStorageAsk(),
}
}
Expand Down
45 changes: 29 additions & 16 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/pieceio"
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewClient(
}

func (c *Client) Run(ctx context.Context) {
_ = c.net.SetDelegate(c)
}

func (c *Client) Stop() {
Expand Down Expand Up @@ -156,7 +158,7 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

request := network.AskRequest{Miner: info.Address}
request := storagemarket.AskRequest{Miner: info.Address}
if err := s.WriteAskRequest(request); err != nil {
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}
Expand Down Expand Up @@ -247,15 +249,6 @@ func (c *Client) ProposeStorageDeal(
return nil, xerrors.Errorf("setting up deal tracking: %w", err)
}

s, err := c.net.NewDealStream(info.PeerID)
if err != nil {
return nil, xerrors.Errorf("connecting to storage provider failed: %w", err)
}
err = c.conns.AddStream(deal.ProposalCid, s)
if err != nil {
return nil, err
}

err = c.statemachines.Send(deal.ProposalCid, storagemarket.ClientEventOpen)
if err != nil {
return nil, xerrors.Errorf("initializing state machine: %w", err)
Expand Down Expand Up @@ -308,6 +301,25 @@ func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) sh
return shared.Unsubscribe(c.pubSub.Subscribe(subscriber))
}

func (c *Client) HandleAskStream(s network.StorageAskStream) {
s.Close()
}

func (c *Client) HandleDealStream(s network.StorageDealStream) {
defer s.Close()
log.Info("Handling storage deal proposal!")

response, err := s.ReadDealResponse()
if err != nil {
log.Errorf("%+v", err)
return
}
err = c.statemachines.Send(response.Response.Proposal, storagemarket.ClientEventReceiveResponse, response)
if err != nil {
log.Errorf("%+v", err)
}
}

func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ClientEvent)
if !ok {
Expand Down Expand Up @@ -355,10 +367,11 @@ func (c *clientDealEnvironment) Node() storagemarket.StorageClientNode {
return c.c.node
}

func (c *clientDealEnvironment) DealStream(proposalCid cid.Cid) (network.StorageDealStream, error) {
return c.c.conns.DealStream(proposalCid)
}

func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error {
return c.c.conns.Disconnect(proposalCid)
func (c *clientDealEnvironment) WriteDealProposal(p peer.ID, proposal storagemarket.ProposalRequest) error {
s, err := c.c.net.NewDealStream(p)
if err != nil {
return err
}
err = s.WriteDealProposal(proposal)
return err
}
18 changes: 6 additions & 12 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var ClientEvents = fsm.Events{
fsm.Event(storagemarket.ClientEventFundingInitiated).
From(storagemarket.StorageDealEnsureClientFunds).To(storagemarket.StorageDealClientFunding).
Action(func(deal *storagemarket.ClientDeal, mcid cid.Cid) error {
deal.AddFundsCid = mcid
deal.AddFundsCid = &mcid
return nil
}),
fsm.Event(storagemarket.ClientEventEnsureFundsFailed).
Expand All @@ -34,17 +34,11 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ClientEventDealProposed).
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating),
fsm.Event(storagemarket.ClientEventDealStreamLookupErrored).
FromAny().To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("miner connection error: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForResponse),
fsm.Event(storagemarket.ClientEventReceiveResponse).
From(storagemarket.StorageDealWaitingForResponse).To(storagemarket.StorageDealValidating).
Action(func(deal *storagemarket.ClientDeal, response storagemarket.SignedResponse) error {
deal.LastResponse = &response
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
Expand Down
Loading