Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timeout for sending cancel message to peer when retrieval cancelled #531

Merged
merged 2 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.4.2
github.com/filecoin-project/go-data-transfer v1.4.3
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.4.2 h1:3+HGuqDaJ3QoXPqYp+kSoaC2QKl9pKgBAqhvc3EVesQ=
github.com/filecoin-project/go-data-transfer v1.4.2/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo=
github.com/filecoin-project/go-data-transfer v1.4.3 h1:ECEw69NOfmEZ7XN1NSBvj3KTbbH2mIczQs+Z2w4bD7c=
github.com/filecoin-project/go-data-transfer v1.4.3/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down
13 changes: 12 additions & 1 deletion retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,18 @@ func (c *clientDealEnvironment) SendDataTransferVoucher(ctx context.Context, cha
}

func (c *clientDealEnvironment) CloseDataTransfer(ctx context.Context, channelID datatransfer.ChannelID) error {
return c.c.dataTransfer.CloseDataTransferChannel(ctx, channelID)
// When we close the data transfer, we also send a cancel message to the peer.
// Make sure we don't wait too long to send the message.
ctx, cancel := context.WithTimeout(ctx, shared.CloseDataTransferTimeout)
defer cancel()

err := c.c.dataTransfer.CloseDataTransferChannel(ctx, channelID)
if shared.IsCtxDone(err) {
log.Warnf("failed to send cancel data transfer channel %s to provider within timeout %s",
channelID, shared.CloseDataTransferTimeout)
return nil
}
return err
}

type clientStoreGetter struct {
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
if channelState.Status() == datatransfer.Completed {
err := deals.Send(rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}, rm.ProviderEventComplete)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}

Expand All @@ -69,7 +69,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber

err := deals.Send(rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}, retrievalEvent, params...)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}

}
Expand Down
4 changes: 1 addition & 3 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,7 @@ CurrentInterval: %d
t.FailNow()
case clientDealState = <-clientDealStateChan:
}
if testCase.failsUnseal {
assert.Equal(t, retrievalmarket.DealStatusErrored, clientDealState.Status)
} else if testCase.cancelled {
if testCase.failsUnseal || testCase.cancelled {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's an error unsealing on the provider, the best we can hope for on the client side is to receive a cancel message.
This test only passed before because the statemachine on the client was already shutdown when it received the Cancel event, so firing the event caused an error.
I fixed that bug so that the statemachine just ignores a Cancel event when the statemachine has already been shutdown

assert.Equal(t, retrievalmarket.DealStatusCancelled, clientDealState.Status)
} else {
if !testCase.zeroPricePerByte {
Expand Down
14 changes: 13 additions & 1 deletion retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/shared"
)

var _ requestvalidation.ValidationEnvironment = new(providerValidationEnvironment)
Expand Down Expand Up @@ -170,7 +171,18 @@ func (pde *providerDealEnvironment) ResumeDataTransfer(ctx context.Context, chid
}

func (pde *providerDealEnvironment) CloseDataTransfer(ctx context.Context, chid datatransfer.ChannelID) error {
return pde.p.dataTransfer.CloseDataTransferChannel(ctx, chid)
// When we close the data transfer, we also send a cancel message to the peer.
// Make sure we don't wait too long to send the message.
ctx, cancel := context.WithTimeout(ctx, shared.CloseDataTransferTimeout)
defer cancel()

err := pde.p.dataTransfer.CloseDataTransferChannel(ctx, chid)
if shared.IsCtxDone(err) {
log.Warnf("failed to send cancel data transfer channel %s to client within timeout %s",
chid, shared.CloseDataTransferTimeout)
return nil
}
return err
}

func (pde *providerDealEnvironment) DeleteStore(storeID multistore.StoreID) error {
Expand Down
16 changes: 16 additions & 0 deletions shared/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package shared

import (
"context"
"errors"
"time"
)

// When we close the data transfer, we also send a cancel message to the peer.
// CloseDataTransferTimeout is the amount of time to wait for the close to
// complete before giving up.
const CloseDataTransferTimeout = 30 * time.Second

func IsCtxDone(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
8 changes: 4 additions & 4 deletions storagemarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
if channelState.Status() == datatransfer.Completed {
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}

Expand All @@ -63,7 +63,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
}
}()
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}
}
Expand All @@ -83,7 +83,7 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
if channelState.Status() == datatransfer.Completed {
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferComplete)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}

Expand All @@ -106,7 +106,7 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
}
}()
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}
}
Expand Down