Skip to content

Commit

Permalink
feat: channel monitor watches for errors instead of measuring data rate
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Apr 8, 2021
1 parent 2ceebfb commit 1d99682
Show file tree
Hide file tree
Showing 16 changed files with 472 additions and 735 deletions.
405 changes: 129 additions & 276 deletions channelmonitor/channelmonitor.go

Large diffs are not rendered by default.

648 changes: 208 additions & 440 deletions channelmonitor/channelmonitor_test.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.SendDataError, err)
}

// ReceiveDataError indicates that the transport layer had an error receiving
// data from the remote peer
func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.ReceiveDataError, err)
}

// HasChannel returns true if the given channel id is being tracked
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
Expand Down
5 changes: 5 additions & 0 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ var ChannelEvents = fsm.Events{
chst.AddLog("data transfer send error: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.ReceiveDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("data transfer receive error: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("data transfer request timed out: %s", chst.Message)
Expand Down
7 changes: 7 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ const (
// SendDataError indicates that the transport layer had an error trying
// to send data to the remote peer
SendDataError

// ReceiveDataError indicates that the transport layer had an error
// receiving data from the remote peer
ReceiveDataError
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -127,6 +131,9 @@ var Events = map[EventCode]string{
DataQueuedProgress: "DataQueuedProgress",
DataSentProgress: "DataSentProgress",
DataReceivedProgress: "DataReceivedProgress",
RequestTimedOut: "RequestTimedOut",
SendDataError: "SendDataError",
ReceiveDataError: "ReceiveDataError",
}

// Event is a struct containing information about a data transfer event
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.6.0
github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0=
github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 h1:JMQvhEKMk8kz31F7GcQba4XCwrO35zXad0/pmhyxfwk=
github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
5 changes: 5 additions & 0 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ func (m *manager) OnSendDataError(chid datatransfer.ChannelID, err error) error
return m.channels.SendDataError(chid, err)
}

func (m *manager) OnReceiveDataError(chid datatransfer.ChannelID, err error) error {
log.Warnf("channel %+v had transport receive error: %s", chid, err)
return m.channels.ReceiveDataError(chid, err)
}

// OnChannelCompleted is called
// - by the requester when all data for a transfer has been received
// - by the responder when all data for a transfer has been sent
Expand Down
13 changes: 11 additions & 2 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
option(m)
}

// Start push / pull channel monitor after applying config options as the config
// Create push / pull channel monitor after applying config options as the config
// options may apply to the monitor
m.channelMonitor = channelmonitor.NewMonitor(m, m.channelMonitorCfg)
m.channelMonitor.Start()

return m, nil
}
Expand Down Expand Up @@ -320,6 +319,12 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
return nil
}

// ConnectTo opens a connection to a peer on the data-transfer protocol,
// retrying if necessary
func (m *manager) ConnectTo(ctx context.Context, p peer.ID) error {
return m.dataTransferNetwork.OpenStreamTo(ctx, p)
}

// close an open channel and fire an error event
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
log.Infof("close channel %s with error %s", chid, cherr)
Expand Down Expand Up @@ -511,3 +516,7 @@ func (m *manager) channelDataTransferType(channel datatransfer.ChannelState) Cha
// we received a push channel
return ManagerPeerReceivePush
}

func (m *manager) PeerID() peer.ID {
return m.peerID
}
8 changes: 2 additions & 6 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
}
}

// disconnectCoordinator is used by TestPushRequestAutoRestart to allow
// disconnectCoordinator is used by TestAutoRestart to allow
// test cases to signal when a disconnect should start, and whether
// to wait for the disconnect to take effect before continuing
type disconnectCoordinator struct {
Expand Down Expand Up @@ -724,14 +724,10 @@ func TestAutoRestart(t *testing.T) {

// Set up
restartConf := ChannelRestartConfig(channelmonitor.Config{
MonitorPushChannels: tc.isPush,
MonitorPullChannels: !tc.isPush,
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
ChecksPerInterval: 10,
RestartBackoff: 500 * time.Millisecond,
MaxConsecutiveRestarts: 5,
RestartAckTimeout: 100 * time.Millisecond,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)
Expand Down
5 changes: 5 additions & 0 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type DataTransferNetwork interface {
// ConnectTo establishes a connection to the given peer
ConnectTo(context.Context, peer.ID) error

// OpenStreamTo establishes a connection to the given peer, retrying if
// necessary, and opens a stream on the data-transfer protocol to verify
// the peer will accept messages on the protocol
OpenStreamTo(ctx context.Context, p peer.ID) error

// ID returns the peer id of this libp2p host
ID() peer.ID
}
Expand Down
12 changes: 12 additions & 0 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ func (dtnet *libp2pDataTransferNetwork) ConnectTo(ctx context.Context, p peer.ID
return dtnet.host.Connect(ctx, peer.AddrInfo{ID: p})
}

// OpenStreamTo establishes a connection to the given peer, retrying if
// necessary, and opens a stream on the data-transfer protocol to verify
// the peer will accept messages on the protocol
func (dtnet *libp2pDataTransferNetwork) OpenStreamTo(ctx context.Context, p peer.ID) error {
s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...)
if err != nil {
return err
}

return s.Close()
}

// handleNewStream receives a new stream from the network.
func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) {
defer s.Close() // nolint: errcheck,gosec
Expand Down
4 changes: 4 additions & 0 deletions testutil/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (fn *FakeNetwork) ConnectTo(_ context.Context, _ peer.ID) error {
panic("not implemented")
}

func (fn *FakeNetwork) OpenStreamTo(ctx context.Context, p peer.ID) error {
panic("implement me")
}

// ID returns a stubbed id for host of this network
func (fn *FakeNetwork) ID() peer.ID {
return fn.PeerID
Expand Down
4 changes: 4 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type EventsHandler interface {
// OnSendDataError is called when a network error occurs sending data
// at the transport layer
OnSendDataError(chid ChannelID, err error) error

// OnReceiveDataError is called when a network error occurs receiving data
// at the transport layer
OnReceiveDataError(chid ChannelID, err error) error
}

/*
Expand Down
26 changes: 25 additions & 1 deletion transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterNetworkErrorListener(t.gsNetworkSendErrorListener))
t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterReceiverNetworkErrorListener(t.gsNetworkReceiveErrorListener))
return nil
}

Expand Down Expand Up @@ -729,13 +730,36 @@ func (t *Transport) gsNetworkSendErrorListener(p peer.ID, request graphsync.Requ
t.dataLock.Lock()
defer t.dataLock.Unlock()

// Fire an error if the graphsync request was made by this node or the remote peer
chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
if !ok {
return
chid, ok = t.graphsyncRequestMap[graphsyncKey{request.ID(), t.peerID}]
if !ok {
return
}
}

err := t.events.OnSendDataError(chid, gserr)
if err != nil {
log.Errorf("failed to fire transport send error %s: %s", gserr, err)
}
}

// Called when there is a graphsync error receiving data
func (t *Transport) gsNetworkReceiveErrorListener(p peer.ID, gserr error) {
t.dataLock.Lock()
defer t.dataLock.Unlock()

// Fire a receive data error on all ongoing graphsync transfers with that
// peer
for _, chid := range t.graphsyncRequestMap {
if chid.Initiator != p && chid.Responder != p {
continue
}

err := t.events.OnReceiveDataError(chid, gserr)
if err != nil {
log.Errorf("failed to fire transport receive error %s: %s", gserr, err)
}
}
}
53 changes: 46 additions & 7 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestManager(t *testing.T) {
require.False(t, events.OnDataQueuedCalled)
},
},
"outgoing data send error will terminate request": {
"outgoing data queued error will terminate request": {
events: fakeEvents{
OnDataQueuedError: errors.New("something went wrong"),
},
Expand All @@ -345,7 +345,7 @@ func TestManager(t *testing.T) {
require.Error(t, gsData.outgoingBlockHookActions.TerminationError)
},
},
"outgoing data send error == pause will pause request": {
"outgoing data queued error == pause will pause request": {
events: fakeEvents{
OnDataQueuedError: datatransfer.ErrPause,
},
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestManager(t *testing.T) {
assertHasOutgoingMessage(t, gsData.incomingRequestHookActions.SentExtensions, gsData.incoming)
},
},
"recognized incoming request will record network error": {
"recognized incoming request will record network send error": {
action: func(gsData *harness) {
gsData.incomingRequestHook()
gsData.networkErrorListener(errors.New("something went wrong"))
Expand All @@ -636,6 +636,34 @@ func TestManager(t *testing.T) {
require.True(t, events.OnSendDataErrorCalled)
},
},
"recognized outgoing request will record network send error": {
action: func(gsData *harness) {
gsData.outgoingRequestHook()
gsData.networkErrorListener(errors.New("something went wrong"))
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.OnSendDataErrorCalled)
},
},
"recognized incoming request will record network receive error": {
action: func(gsData *harness) {
gsData.incomingRequestHook()
gsData.receiverNetworkErrorListener(errors.New("something went wrong"))
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.Equal(t, 1, events.OnRequestReceivedCallCount)
require.True(t, events.OnReceiveDataErrorCalled)
},
},
"recognized outgoing request will record network receive error": {
action: func(gsData *harness) {
gsData.outgoingRequestHook()
gsData.receiverNetworkErrorListener(errors.New("something went wrong"))
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
require.True(t, events.OnReceiveDataErrorCalled)
},
},
"open channel adds doNotSendCids to the DoNotSend extension": {
action: func(gsData *harness) {
cids := testutil.GenerateCids(2)
Expand Down Expand Up @@ -974,10 +1002,12 @@ type fakeEvents struct {
OnDataQueuedMessage datatransfer.Message
OnDataQueuedError error

OnRequestTimedOutCalled bool
OnRequestTimedOutChannelId datatransfer.ChannelID
OnSendDataErrorCalled bool
OnSendDataErrorChannelID datatransfer.ChannelID
OnRequestTimedOutCalled bool
OnRequestTimedOutChannelId datatransfer.ChannelID
OnSendDataErrorCalled bool
OnSendDataErrorChannelID datatransfer.ChannelID
OnReceiveDataErrorCalled bool
OnReceiveDataErrorChannelID datatransfer.ChannelID

ChannelCompletedSuccess bool
RequestReceivedRequest datatransfer.Request
Expand Down Expand Up @@ -1008,6 +1038,12 @@ func (fe *fakeEvents) OnSendDataError(chid datatransfer.ChannelID, err error) er
return nil
}

func (fe *fakeEvents) OnReceiveDataError(chid datatransfer.ChannelID, err error) error {
fe.OnReceiveDataErrorCalled = true
fe.OnReceiveDataErrorChannelID = chid
return nil
}

func (fe *fakeEvents) OnChannelOpened(chid datatransfer.ChannelID) error {
fe.ChannelOpenedChannelID = chid
return fe.OnChannelOpenedError
Expand Down Expand Up @@ -1099,6 +1135,9 @@ func (ha *harness) requestorCancelledListener() {
func (ha *harness) networkErrorListener(err error) {
ha.fgs.NetworkErrorListener(ha.other, ha.request, err)
}
func (ha *harness) receiverNetworkErrorListener(err error) {
ha.fgs.ReceiverNetworkErrorListener(ha.other, err)
}

type dtConfig struct {
dtExtensionMissing bool
Expand Down

0 comments on commit 1d99682

Please sign in to comment.