Skip to content

Commit

Permalink
add timeout for sending cancel message to peer when retrieval cancell…
Browse files Browse the repository at this point in the history
…ed (#531)

* fix: add timeout for sending cancel message to peer on close

* fix: remove extraneous import
  • Loading branch information
dirkmc authored Apr 2, 2021
1 parent ccf567f commit ff7bf5a
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 14 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.4.2
github.com/filecoin-project/go-data-transfer v1.4.3
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.4.2 h1:3+HGuqDaJ3QoXPqYp+kSoaC2QKl9pKgBAqhvc3EVesQ=
github.com/filecoin-project/go-data-transfer v1.4.2/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo=
github.com/filecoin-project/go-data-transfer v1.4.3 h1:ECEw69NOfmEZ7XN1NSBvj3KTbbH2mIczQs+Z2w4bD7c=
github.com/filecoin-project/go-data-transfer v1.4.3/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down
13 changes: 12 additions & 1 deletion retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,18 @@ func (c *clientDealEnvironment) SendDataTransferVoucher(ctx context.Context, cha
}

func (c *clientDealEnvironment) CloseDataTransfer(ctx context.Context, channelID datatransfer.ChannelID) error {
return c.c.dataTransfer.CloseDataTransferChannel(ctx, channelID)
// When we close the data transfer, we also send a cancel message to the peer.
// Make sure we don't wait too long to send the message.
ctx, cancel := context.WithTimeout(ctx, shared.CloseDataTransferTimeout)
defer cancel()

err := c.c.dataTransfer.CloseDataTransferChannel(ctx, channelID)
if shared.IsCtxDone(err) {
log.Warnf("failed to send cancel data transfer channel %s to provider within timeout %s",
channelID, shared.CloseDataTransferTimeout)
return nil
}
return err
}

type clientStoreGetter struct {
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
if channelState.Status() == datatransfer.Completed {
err := deals.Send(rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}, rm.ProviderEventComplete)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}

Expand All @@ -69,7 +69,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber

err := deals.Send(rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}, retrievalEvent, params...)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}

}
Expand Down
4 changes: 1 addition & 3 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,7 @@ CurrentInterval: %d
t.FailNow()
case clientDealState = <-clientDealStateChan:
}
if testCase.failsUnseal {
assert.Equal(t, retrievalmarket.DealStatusErrored, clientDealState.Status)
} else if testCase.cancelled {
if testCase.failsUnseal || testCase.cancelled {
assert.Equal(t, retrievalmarket.DealStatusCancelled, clientDealState.Status)
} else {
if !testCase.zeroPricePerByte {
Expand Down
14 changes: 13 additions & 1 deletion retrievalmarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/shared"
)

var _ requestvalidation.ValidationEnvironment = new(providerValidationEnvironment)
Expand Down Expand Up @@ -170,7 +171,18 @@ func (pde *providerDealEnvironment) ResumeDataTransfer(ctx context.Context, chid
}

func (pde *providerDealEnvironment) CloseDataTransfer(ctx context.Context, chid datatransfer.ChannelID) error {
return pde.p.dataTransfer.CloseDataTransferChannel(ctx, chid)
// When we close the data transfer, we also send a cancel message to the peer.
// Make sure we don't wait too long to send the message.
ctx, cancel := context.WithTimeout(ctx, shared.CloseDataTransferTimeout)
defer cancel()

err := pde.p.dataTransfer.CloseDataTransferChannel(ctx, chid)
if shared.IsCtxDone(err) {
log.Warnf("failed to send cancel data transfer channel %s to client within timeout %s",
chid, shared.CloseDataTransferTimeout)
return nil
}
return err
}

func (pde *providerDealEnvironment) DeleteStore(storeID multistore.StoreID) error {
Expand Down
16 changes: 16 additions & 0 deletions shared/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package shared

import (
"context"
"errors"
"time"
)

// When we close the data transfer, we also send a cancel message to the peer.
// CloseDataTransferTimeout is the amount of time to wait for the close to
// complete before giving up.
const CloseDataTransferTimeout = 30 * time.Second

func IsCtxDone(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
8 changes: 4 additions & 4 deletions storagemarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
if channelState.Status() == datatransfer.Completed {
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}

Expand All @@ -63,7 +63,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
}
}()
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}
}
Expand All @@ -83,7 +83,7 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
if channelState.Status() == datatransfer.Completed {
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferComplete)
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}

Expand All @@ -106,7 +106,7 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
}
}()
if err != nil {
log.Errorf("processing dt event: %w", err)
log.Errorf("processing dt event: %s", err)
}
}
}
Expand Down

0 comments on commit ff7bf5a

Please sign in to comment.