From 4b38ef713c8e5f67ff6328daad2f451737ee9238 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 6 Apr 2021 11:11:08 +0200 Subject: [PATCH 1/4] feat: channel monitor watches for errors instead of measuring data rate --- channelmonitor/channelmonitor.go | 405 +++++----------- channelmonitor/channelmonitor_test.go | 648 +++++++++----------------- channels/channels.go | 6 + channels/channels_fsm.go | 5 + events.go | 7 + go.mod | 2 +- go.sum | 4 +- impl/events.go | 5 + impl/impl.go | 13 +- impl/integration_test.go | 8 +- network/interface.go | 5 + network/libp2p_impl.go | 12 + testutil/testnet.go | 4 + transport.go | 4 + transport/graphsync/graphsync.go | 26 +- transport/graphsync/graphsync_test.go | 53 ++- 16 files changed, 472 insertions(+), 735 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index f79fb2b4..cbba90f8 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -7,6 +7,7 @@ import ( "time" logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -19,10 +20,12 @@ type monitorAPI interface { SubscribeToEvents(subscriber datatransfer.Subscriber) datatransfer.Unsubscribe RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error + ConnectTo(context.Context, peer.ID) error + PeerID() peer.ID } -// Monitor watches the data-rate for data transfer channels, and restarts -// a channel if the data-rate falls too low or if there are timeouts / errors +// Monitor watches the events for data transfer channels, and restarts +// a channel if there are timeouts / errors type Monitor struct { ctx context.Context stop context.CancelFunc @@ -30,26 +33,19 @@ type Monitor struct { cfg *Config lk sync.RWMutex - channels map[datatransfer.ChannelID]monitoredChan + channels map[datatransfer.ChannelID]*monitoredChannel } type Config struct { - // Indicates whether push channel monitoring is enabled - MonitorPushChannels bool - // Indicates whether pull channel monitoring is enabled - MonitorPullChannels bool // Max time to wait for other side to accept open channel request before attempting restart AcceptTimeout time.Duration - // Interval between checks of transfer rate - Interval time.Duration - // Min bytes that must be sent / received in interval - MinBytesTransferred uint64 - // Number of times to check transfer rate per interval - ChecksPerInterval uint32 // Backoff after restarting RestartBackoff time.Duration // Number of times to try to restart before failing MaxConsecutiveRestarts uint32 + // Max time to wait for the peer to acknowledge a restart request. + // Note: Does not include the time taken to reconnect to the peer. + RestartAckTimeout time.Duration // Max time to wait for the responder to send a Complete message once all // data has been sent CompleteTimeout time.Duration @@ -63,7 +59,7 @@ func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor { stop: cancel, mgr: mgr, cfg: cfg, - channels: make(map[datatransfer.ChannelID]monitoredChan), + channels: make(map[datatransfer.ChannelID]*monitoredChannel), } } @@ -76,51 +72,32 @@ func checkConfig(cfg *Config) { if cfg.AcceptTimeout <= 0 { panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be > 0", cfg.AcceptTimeout)) } - if cfg.Interval <= 0 { - panic(fmt.Sprintf(prefix+"Interval is %s but must be > 0", cfg.Interval)) - } - if cfg.ChecksPerInterval == 0 { - panic(fmt.Sprintf(prefix+"ChecksPerInterval is %d but must be > 0", cfg.ChecksPerInterval)) - } - if cfg.MinBytesTransferred == 0 { - panic(fmt.Sprintf(prefix+"MinBytesTransferred is %d but must be > 0", cfg.MinBytesTransferred)) - } if cfg.MaxConsecutiveRestarts == 0 { panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts)) } + if cfg.RestartAckTimeout <= 0 { + panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout)) + } if cfg.CompleteTimeout <= 0 { panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout)) } } -// This interface just makes it easier to abstract some methods between the -// push and pull monitor implementations -type monitoredChan interface { - Shutdown() bool - checkDataRate() -} - // AddPushChannel adds a push channel to the channel monitor -func (m *Monitor) AddPushChannel(chid datatransfer.ChannelID) monitoredChan { +func (m *Monitor) AddPushChannel(chid datatransfer.ChannelID) *monitoredChannel { return m.addChannel(chid, true) } // AddPullChannel adds a pull channel to the channel monitor -func (m *Monitor) AddPullChannel(chid datatransfer.ChannelID) monitoredChan { +func (m *Monitor) AddPullChannel(chid datatransfer.ChannelID) *monitoredChannel { return m.addChannel(chid, false) } // addChannel adds a channel to the channel monitor -func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitoredChan { +func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitoredChannel { if !m.enabled() { return nil } - if isPush && !m.cfg.MonitorPushChannels { - return nil - } - if !isPush && !m.cfg.MonitorPullChannels { - return nil - } m.lk.Lock() defer m.lk.Unlock() @@ -136,19 +113,13 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored return nil } - // Create the channel monitor - var mpc monitoredChan - if isPush { - mpc = newMonitoredPushChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) - } else { - mpc = newMonitoredPullChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) - } + mpc := newMonitoredChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) m.channels[chid] = mpc return mpc } func (m *Monitor) Shutdown() { - // Causes the run loop to exit + // Cancel the context for the Monitor m.stop() } @@ -176,48 +147,8 @@ func (m *Monitor) enabled() bool { return m.cfg != nil } -func (m *Monitor) Start() { - if !m.enabled() { - return - } - - go m.run() -} - -func (m *Monitor) run() { - defer m.onShutdown() - - // Check data-rate ChecksPerInterval times per interval - tickInterval := m.cfg.Interval / time.Duration(m.cfg.ChecksPerInterval) - ticker := time.NewTicker(tickInterval) - defer ticker.Stop() - - log.Infof("Starting data-transfer channel monitor with "+ - "%d checks per %s interval (check interval %s); min bytes per interval: %d, restart backoff: %s; max consecutive restarts: %d", - m.cfg.ChecksPerInterval, m.cfg.Interval, tickInterval, m.cfg.MinBytesTransferred, m.cfg.RestartBackoff, m.cfg.MaxConsecutiveRestarts) - - for { - select { - case <-m.ctx.Done(): - return - case <-ticker.C: - m.checkDataRate() - } - } -} - -// check data rate for all monitored channels -func (m *Monitor) checkDataRate() { - m.lk.RLock() - defer m.lk.RUnlock() - - for _, ch := range m.channels { - ch.checkDataRate() - } -} - -// monitoredChannel keeps track of the data-rate for a channel, and -// restarts the channel if the rate falls below the minimum allowed +// monitoredChannel keeps track of events for a channel, and +// restarts the channel if there are connection issues type monitoredChannel struct { // The parentCtx is used when sending a close message for a channel, so // that operation can continue even after the monitoredChannel is shutdown @@ -229,7 +160,6 @@ type monitoredChannel struct { cfg *Config unsub datatransfer.Unsubscribe onShutdown func(datatransfer.ChannelID) - onDTEvent datatransfer.Subscriber shutdownLk sync.Mutex restartLk sync.RWMutex @@ -243,7 +173,6 @@ func newMonitoredChannel( chid datatransfer.ChannelID, cfg *Config, onShutdown func(datatransfer.ChannelID), - onDTEvent datatransfer.Subscriber, ) *monitoredChannel { ctx, cancel := context.WithCancel(context.Background()) mpc := &monitoredChannel{ @@ -254,16 +183,11 @@ func newMonitoredChannel( chid: chid, cfg: cfg, onShutdown: onShutdown, - onDTEvent: onDTEvent, } mpc.start() return mpc } -// Overridden by sub-classes -func (mc *monitoredChannel) checkDataRate() { -} - // Cancel the context and unsubscribe from events. // Returns true if channel has not already been shutdown. func (mc *monitoredChannel) Shutdown() bool { @@ -291,12 +215,12 @@ func (mc *monitoredChannel) start() { mc.shutdownLk.Lock() defer mc.shutdownLk.Unlock() - log.Debugf("%s: starting channel data-rate monitoring", mc.chid) + log.Debugf("%s: starting data-transfer channel monitoring", mc.chid) // Watch to make sure the responder accepts the channel in time cancelAcceptTimer := mc.watchForResponderAccept() - // Watch for data rate events + // Watch for data-transfer channel events mc.unsub = mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { if channelState.ChannelID() != mc.chid { return @@ -305,7 +229,7 @@ func (mc *monitoredChannel) start() { // Once the channel completes, shut down the monitor state := channelState.Status() if channels.IsChannelCleaningUp(state) || channels.IsChannelTerminated(state) { - log.Debugf("%s: stopping channel data-rate monitoring (event: %s / state: %s)", + log.Debugf("%s: stopping data-transfer channel monitoring (event: %s / state: %s)", mc.chid, datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()]) go mc.Shutdown() return @@ -320,15 +244,20 @@ func (mc *monitoredChannel) start() { // attempt to restart the channel log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid) go mc.restartChannel() + case datatransfer.ReceiveDataError: + // If the transport layer reports an error receiving data over the wire, + // attempt to restart the channel + log.Warnf("%s: data transfer transport receive error, restarting data transfer", mc.chid) + go mc.restartChannel() case datatransfer.FinishTransfer: // The channel initiator has finished sending / receiving all data. // Watch to make sure that the responder sends a message to acknowledge // that the transfer is complete go mc.watchForResponderComplete() - default: - // Delegate to the push channel monitor or pull channel monitor to - // handle the event - mc.onDTEvent(event, channelState) + case datatransfer.DataSent, datatransfer.DataReceived: + // Some data was sent / received so reset the consecutive restart + // counter + mc.resetConsecutiveRestarts() } }) } @@ -365,7 +294,8 @@ func (mc *monitoredChannel) watchForResponderComplete() { select { case <-mc.ctx.Done(): - // When the Complete message is received, the channel shuts down + // When the Complete message is received, the channel shuts down and + // its context is cancelled case <-timer.C: // Timer expired before we received a Complete from the responder err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer", @@ -383,6 +313,14 @@ func (mc *monitoredChannel) resetConsecutiveRestarts() { mc.consecutiveRestarts = 0 } +// Used by the tests +func (mc *monitoredChannel) isRestarting() bool { + mc.restartLk.Lock() + defer mc.restartLk.Unlock() + + return !mc.restartedAt.IsZero() +} + func (mc *monitoredChannel) restartChannel() { var restartCount int var restartedAt time.Time @@ -410,39 +348,76 @@ func (mc *monitoredChannel) restartChannel() { // If no data has been transferred since the last transfer, and we've // reached the consecutive restart limit, close the channel and // shutdown the monitor - err := xerrors.Errorf("%s: after %d consecutive restarts failed to reach required data transfer rate", mc.chid, restartCount) + err := xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount) mc.closeChannelAndShutdown(err) return } - // Send a restart message for the channel. + // Send the restart message + err := mc.sendRestartMessage(restartCount) + if err != nil { + // If the restart message could not be sent, close the channel and + // shutdown the monitor + mc.closeChannelAndShutdown(err) + return + } + + // Restart complete, so clear the restart time so that another restart + // can begin + mc.restartLk.Lock() + mc.restartedAt = time.Time{} + mc.restartLk.Unlock() +} + +func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { + // Establish a connection to the peer, in case the connection went down. // Note that at the networking layer there is logic to retry if a network // connection cannot be established, so this may take some time. - log.Infof("%s: sending restart message (%d consecutive restarts)", mc.chid, restartCount) - err := mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) + p := mc.chid.OtherParty(mc.mgr.PeerID()) + log.Infof("%s: re-establishing connection to %s", mc.chid, p) + start := time.Now() + err := mc.mgr.ConnectTo(mc.ctx, p) if err != nil { - // If it wasn't possible to restart the channel, close the channel - // and shut down the monitor - cherr := xerrors.Errorf("%s: failed to send restart message: %s", mc.chid, err) - mc.closeChannelAndShutdown(cherr) - } else if mc.cfg.RestartBackoff > 0 { - log.Infof("%s: restart message sent successfully, backing off %s before allowing any other restarts", + return xerrors.Errorf("%s: failed to reconnect to peer %s after %s: %w", + mc.chid, p, time.Since(start), err) + } + log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) + + // Send a restart message for the channel. + restartResult := mc.waitForRestartResponse() + log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) + err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) + if err != nil { + return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) + } + + // The restart message is fire and forget, so we need to watch for a + // restart response to know that the restart message reached the peer. + select { + case <-mc.ctx.Done(): + return nil // channel shutdown so just bail out + case err = <-restartResult: + if err != nil { + return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) + } + } + log.Infof("%s: received restart response from %s", mc.chid, p) + + // The restart message was sent successfully. + // If a restart backoff is configured, backoff after a restart before + // attempting another. + if mc.cfg.RestartBackoff > 0 { + log.Infof("%s: backing off %s before allowing any other restarts", mc.chid, mc.cfg.RestartBackoff) - // Backoff a little time after a restart before attempting another select { case <-time.After(mc.cfg.RestartBackoff): + log.Infof("%s: restart back-off of %s complete", mc.chid, mc.cfg.RestartBackoff) case <-mc.ctx.Done(): + return nil } - - log.Debugf("%s: restart back-off %s complete", - mc.chid, mc.cfg.RestartBackoff) } - // Restart complete, so clear the restart time so that another restart - // can begin - mc.restartLk.Lock() - mc.restartedAt = time.Time{} - mc.restartLk.Unlock() + return nil } // Shut down the monitor and close the data transfer channel @@ -462,167 +437,45 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { } } -// Snapshot of the pending and sent data at a particular point in time. -// The push channel monitor takes regular snapshots and compares them to -// decide if the data rate has fallen too low. -type dataRatePoint struct { - pending uint64 - sent uint64 -} - -// Keeps track of the data rate for a push channel -type monitoredPushChannel struct { - *monitoredChannel - - statsLk sync.RWMutex - queued uint64 - sent uint64 - dataRatePoints chan *dataRatePoint -} +// Wait for the peer to send an acknowledgement to the restart request +func (mc *monitoredChannel) waitForRestartResponse() chan error { + restartFired := make(chan struct{}) + restarted := make(chan error) + timer := time.NewTimer(mc.cfg.RestartAckTimeout) -func newMonitoredPushChannel( - parentCtx context.Context, - mgr monitorAPI, - chid datatransfer.ChannelID, - cfg *Config, - onShutdown func(datatransfer.ChannelID), -) *monitoredPushChannel { - mpc := &monitoredPushChannel{ - dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval), - } - mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) - return mpc -} - -// check if the amount of data sent in the interval was too low, and if so -// restart the channel -func (mc *monitoredPushChannel) checkDataRate() { - mc.statsLk.Lock() - defer mc.statsLk.Unlock() - - // Before returning, add the current data rate stats to the queue - defer func() { - var pending uint64 - if mc.queued > mc.sent { // should always be true but just in case - pending = mc.queued - mc.sent - } - mc.dataRatePoints <- &dataRatePoint{ - pending: pending, - sent: mc.sent, + unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if channelState.ChannelID() != mc.chid { + return } - }() - - // Check that there are enough data points that an interval has elapsed - if len(mc.dataRatePoints) < int(mc.cfg.ChecksPerInterval) { - log.Debugf("%s: not enough data points to check data rate yet (%d / %d)", - mc.chid, len(mc.dataRatePoints), mc.cfg.ChecksPerInterval) - - return - } - // Pop the data point from one interval ago - atIntervalStart := <-mc.dataRatePoints - - // If there was enough pending data to cover the minimum required amount, - // and the amount sent was lower than the minimum required, restart the - // channel - sentInInterval := mc.sent - atIntervalStart.sent - log.Debugf("%s: since last check: sent: %d - %d = %d, pending: %d, required %d", - mc.chid, mc.sent, atIntervalStart.sent, sentInInterval, atIntervalStart.pending, mc.cfg.MinBytesTransferred) - if atIntervalStart.pending > sentInInterval && sentInInterval < mc.cfg.MinBytesTransferred { - log.Warnf("%s: data-rate too low, restarting channel: since last check %s ago: sent: %d, required %d", - mc.chid, mc.cfg.Interval, mc.sent, mc.cfg.MinBytesTransferred) - go mc.restartChannel() - } -} + // The Restart event is fired when we receive an acknowledgement + // from the peer that it has received a restart request + if event.Code == datatransfer.Restart { + close(restartFired) + } + }) -// Update the queued / sent amount each time it changes -func (mc *monitoredPushChannel) onDTEvent(event datatransfer.Event, channelState datatransfer.ChannelState) { - switch event.Code { - case datatransfer.DataQueued: - // Keep track of the amount of data queued - mc.statsLk.Lock() - mc.queued = channelState.Queued() - mc.statsLk.Unlock() - - case datatransfer.DataSent: - // Keep track of the amount of data sent - mc.statsLk.Lock() - mc.sent = channelState.Sent() - mc.statsLk.Unlock() - - // Some data was sent so reset the consecutive restart counter - mc.resetConsecutiveRestarts() - } -} + go func() { + defer unsub() + defer timer.Stop() -// Keeps track of the data rate for a pull channel -type monitoredPullChannel struct { - *monitoredChannel + select { - statsLk sync.RWMutex - received uint64 - dataRatePoints chan uint64 -} + // Restart ack received from peer + case <-restartFired: + restarted <- nil -func newMonitoredPullChannel( - parentCtx context.Context, - mgr monitorAPI, - chid datatransfer.ChannelID, - cfg *Config, - onShutdown func(datatransfer.ChannelID), -) *monitoredPullChannel { - mpc := &monitoredPullChannel{ - dataRatePoints: make(chan uint64, cfg.ChecksPerInterval), - } - mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent) - return mpc -} - -// check if the amount of data received in the interval was too low, and if so -// restart the channel -func (mc *monitoredPullChannel) checkDataRate() { - mc.statsLk.Lock() - defer mc.statsLk.Unlock() + // Channel monitor shutdown, just bail out + case <-mc.ctx.Done(): + restarted <- nil - // Before returning, add the current data rate stats to the queue - defer func() { - mc.dataRatePoints <- mc.received + // Timer expired before receiving a restart ack from peer + case <-timer.C: + p := mc.chid.OtherParty(mc.mgr.PeerID()) + restarted <- xerrors.Errorf("did not receive response to restart request from %s after %s", + p, mc.cfg.RestartAckTimeout) + } }() - // Check that there are enough data points that an interval has elapsed - if len(mc.dataRatePoints) < int(mc.cfg.ChecksPerInterval) { - log.Debugf("%s: not enough data points to check data rate yet (%d / %d)", - mc.chid, len(mc.dataRatePoints), mc.cfg.ChecksPerInterval) - - return - } - - // Pop the data point from one interval ago - atIntervalStart := <-mc.dataRatePoints - - // If the amount received was lower than the minimum required, restart the - // channel - rcvdInInterval := mc.received - atIntervalStart - log.Debugf("%s: since last check: received: %d - %d = %d, required %d", - mc.chid, mc.received, atIntervalStart, rcvdInInterval, mc.cfg.MinBytesTransferred) - if rcvdInInterval < mc.cfg.MinBytesTransferred { - log.Warnf("%s: data-rate too low, restarting channel: since last check received %d but required %d", - mc.chid, rcvdInInterval, mc.cfg.MinBytesTransferred) - go mc.restartChannel() - } -} - -// Update the received amount each time it changes -func (mc *monitoredPullChannel) onDTEvent(event datatransfer.Event, channelState datatransfer.ChannelState) { - switch event.Code { - case datatransfer.DataReceived: - // Keep track of the amount of data received - mc.statsLk.Lock() - mc.received = channelState.Received() - mc.statsLk.Unlock() - - // Some data was received so reset the consecutive restart counter - mc.resetConsecutiveRestarts() - } + return restarted } diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index 885d8d4c..ce72ee36 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -2,6 +2,7 @@ package channelmonitor import ( "context" + "fmt" "sync" "testing" "time" @@ -21,443 +22,160 @@ var ch1 = datatransfer.ChannelID{ ID: 1, } -func TestPushChannelMonitorAutoRestart(t *testing.T) { +func TestChannelMonitorAutoRestart(t *testing.T) { type testCase struct { - name string - errOnRestart bool - dataQueued uint64 - dataSent uint64 - errorEvent bool + name string + errReconnect bool + errSendRestartMsg bool + timeoutRestartAck bool } testCases := []testCase{{ - name: "attempt restart", - errOnRestart: false, - dataQueued: 10, - dataSent: 5, + name: "attempt restart", }, { - name: "fail attempt restart", - errOnRestart: true, - dataQueued: 10, - dataSent: 5, + name: "fail to reconnect to peer", + errReconnect: true, }, { - name: "error event", - errOnRestart: false, - dataQueued: 10, - dataSent: 10, - errorEvent: true, + name: "fail to send restart message", + errSendRestartMsg: true, }, { - name: "error event then fail attempt restart", - errOnRestart: true, - dataQueued: 10, - dataSent: 10, - errorEvent: true, + name: "timeout waiting for restart message ack from peer", + timeoutRestartAck: true, }} - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, tc.errOnRestart) - - m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: true, - AcceptTimeout: time.Hour, - Interval: 10 * time.Millisecond, - ChecksPerInterval: 10, - MinBytesTransferred: 1, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - }) - m.Start() - mch := m.AddPushChannel(ch1).(*monitoredPushChannel) - - // Simulate the responder sending Accept - mockAPI.accept() - - // Simulate data being queued and sent - // If sent - queued > MinBytesTransferred it should cause a restart - mockAPI.dataQueued(tc.dataQueued) - mockAPI.dataSent(tc.dataSent) - - if tc.errorEvent { - // Fire an error event, should cause a restart - mockAPI.sendDataErrorEvent() - } - - if tc.errOnRestart { - // If there is an error attempting to restart, just wait for - // the push channel to be closed - <-mockAPI.closed - return - } - - // Verify that channel is restarted within interval - select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to restart channel") - case <-mockAPI.restarts: - } - - // Simulate sending the remaining data - delta := tc.dataQueued - tc.dataSent - if delta > 0 { - mockAPI.dataSent(delta) - } - - // Simulate the complete event - mockAPI.completed() - - // Verify that channel has been shutdown - verifyChannelShutdown(t, mch.ctx) - }) - } -} + runTest := func(name string, isPush bool) { + for _, tc := range testCases { + t.Run(name+": "+tc.name, func(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, tc.errReconnect, tc.errSendRestartMsg) -func TestPullChannelMonitorAutoRestart(t *testing.T) { - type testCase struct { - name string - errOnRestart bool - dataRcvd uint64 - errorEvent bool - } - testCases := []testCase{{ - name: "attempt restart", - errOnRestart: false, - dataRcvd: 10, - }, { - name: "fail attempt restart", - errOnRestart: true, - dataRcvd: 10, - }, { - name: "error event", - errOnRestart: false, - dataRcvd: 10, - errorEvent: true, - }, { - name: "error event then fail attempt restart", - errOnRestart: true, - dataRcvd: 10, - errorEvent: true, - }} + triggerErrorEvent := func() { + if isPush { + mockAPI.sendDataErrorEvent() + } else { + mockAPI.receiveDataErrorEvent() + } + } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, tc.errOnRestart) + m := NewMonitor(mockAPI, &Config{ + AcceptTimeout: time.Hour, + MaxConsecutiveRestarts: 3, + RestartAckTimeout: 50 * time.Millisecond, + CompleteTimeout: time.Hour, + }) - m := NewMonitor(mockAPI, &Config{ - MonitorPullChannels: true, - AcceptTimeout: time.Hour, - Interval: 10 * time.Millisecond, - ChecksPerInterval: 10, - MinBytesTransferred: 1, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - }) - m.Start() - mch := m.AddPullChannel(ch1).(*monitoredPullChannel) + var mch *monitoredChannel + if isPush { + mch = m.AddPushChannel(ch1) + } else { + mch = m.AddPullChannel(ch1) + } - // Simulate the responder sending Accept - mockAPI.accept() + // Simulate the responder sending Accept + mockAPI.accept() - // Simulate receiving some data - mockAPI.dataReceived(tc.dataRcvd) + if isPush { + // Simulate data being queued and sent + mockAPI.dataQueued(10) + mockAPI.dataSent(5) + } else { + // Simulate data being received + mockAPI.dataReceived(10) + } - if tc.errorEvent { - // Fire an error event, should cause a restart - mockAPI.sendDataErrorEvent() - } + // Simulate error sending / receiving data + triggerErrorEvent() - if tc.errOnRestart { // If there is an error attempting to restart, just wait for - // the pull channel to be closed - <-mockAPI.closed - return - } - - // Verify that channel is restarted within interval - select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to restart channel") - case <-mockAPI.restarts: - } - - // Simulate sending more data - mockAPI.dataSent(tc.dataRcvd) - - // Simulate the complete event - mockAPI.completed() - - // Verify that channel has been shutdown - verifyChannelShutdown(t, mch.ctx) - }) - } -} - -func TestPushChannelMonitorDataRate(t *testing.T) { - type dataPoint struct { - queued uint64 - sent uint64 - } - type testCase struct { - name string - minBytesSent uint64 - dataPoints []dataPoint - expectRestart bool - } - testCases := []testCase{{ - name: "restart when sent (10) < pending (20)", - minBytesSent: 1, - dataPoints: []dataPoint{{ - queued: 20, - sent: 10, - }}, - expectRestart: true, - }, { - name: "dont restart when sent (20) >= pending (10)", - minBytesSent: 1, - dataPoints: []dataPoint{{ - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 20, - }}, - expectRestart: false, - }, { - name: "restart when sent (5) < pending (10)", - minBytesSent: 10, - dataPoints: []dataPoint{{ - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 15, - }}, - expectRestart: true, - }, { - name: "dont restart when pending is zero", - minBytesSent: 1, - dataPoints: []dataPoint{{ - queued: 20, - sent: 20, - }}, - expectRestart: false, - }, { - name: "dont restart when pending increases but sent also increases within interval", - minBytesSent: 1, - dataPoints: []dataPoint{{ - queued: 10, - sent: 10, - }, { - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 20, - }}, - expectRestart: false, - }, { - name: "restart when pending increases and sent doesn't increase within interval", - minBytesSent: 1, - dataPoints: []dataPoint{{ - queued: 10, - sent: 10, - }, { - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 20, - }}, - expectRestart: true, - }, { - name: "dont restart with typical progression", - minBytesSent: 1, - dataPoints: []dataPoint{{ - queued: 10, - sent: 10, - }, { - queued: 20, - sent: 10, - }, { - queued: 20, - sent: 15, - }, { - queued: 30, - sent: 25, - }, { - queued: 35, - sent: 30, - }, { - queued: 35, - sent: 35, - }}, - expectRestart: false, - }} - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, false) - - checksPerInterval := uint32(1) - m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: true, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: checksPerInterval, - MinBytesTransferred: tc.minBytesSent, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - }) - - // Note: Don't start monitor, we'll call checkDataRate() manually - - m.AddPushChannel(ch1) - - totalChecks := checksPerInterval + uint32(len(tc.dataPoints)) - for i := uint32(0); i < totalChecks; i++ { - if i < uint32(len(tc.dataPoints)) { - dp := tc.dataPoints[i] - mockAPI.dataQueued(dp.queued) - mockAPI.dataSent(dp.sent) + // the push channel to be closed + if tc.errReconnect || tc.errSendRestartMsg { + mockAPI.verifyChannelClosed(t, true) + return } - m.checkDataRate() - } - // Check if channel was restarted - select { - case <-time.After(5 * time.Millisecond): - if tc.expectRestart { + // Verify that restart message is sent + select { + case <-time.After(100 * time.Millisecond): require.Fail(t, "failed to restart channel") + case <-mockAPI.restartMessages: } - case <-mockAPI.restarts: - if !tc.expectRestart { - require.Fail(t, "expected no channel restart") - } - } - }) - } -} - -func TestPullChannelMonitorDataRate(t *testing.T) { - type testCase struct { - name string - minBytesTransferred uint64 - dataPoints []uint64 - expectRestart bool - } - testCases := []testCase{{ - name: "restart when received (5) < min required (10)", - minBytesTransferred: 10, - dataPoints: []uint64{10, 15}, - expectRestart: true, - }, { - name: "dont restart when received (5) > min required (1)", - minBytesTransferred: 1, - dataPoints: []uint64{10, 15}, - expectRestart: false, - }, { - name: "dont restart with typical progression", - minBytesTransferred: 1, - dataPoints: []uint64{10, 20, 30, 40}, - expectRestart: false, - }} - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, false) - - checksPerInterval := uint32(1) - m := NewMonitor(mockAPI, &Config{ - MonitorPullChannels: true, - AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: checksPerInterval, - MinBytesTransferred: tc.minBytesTransferred, - MaxConsecutiveRestarts: 3, - CompleteTimeout: time.Hour, - }) - // Note: Don't start monitor, we'll call checkDataRate() manually + // If simulating a restart ack timeout, don't fire the restart + // ack event and expect the channel to be closed with an error + if tc.timeoutRestartAck { + mockAPI.verifyChannelClosed(t, true) + return + } - m.AddPullChannel(ch1) + // Simulate receiving restart message ack from responder + mockAPI.restartEvent() - totalChecks := uint32(len(tc.dataPoints)) - for i := uint32(0); i < totalChecks; i++ { - if i < uint32(len(tc.dataPoints)) { - rcvd := tc.dataPoints[i] - mockAPI.dataReceived(rcvd) + if isPush { + // Simulate sending the remaining data + mockAPI.dataSent(5) + } else { + // Simulate receiving more data + mockAPI.dataReceived(5) } - m.checkDataRate() - } - // Check if channel was restarted - select { - case <-time.After(5 * time.Millisecond): - if tc.expectRestart { - require.Fail(t, "failed to restart channel") - } - case <-mockAPI.restarts: - if !tc.expectRestart { - require.Fail(t, "expected no channel restart") - } - } - }) + // Simulate the complete event + mockAPI.completed() + + // Verify that channel has been shutdown + verifyChannelShutdown(t, mch.ctx) + }) + } } + + runTest("push", true) + runTest("pull", false) } func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { runTest := func(name string, isPush bool) { t.Run(name, func(t *testing.T) { ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, false) + mockAPI := newMockMonitorAPI(ch, false, false) + + triggerErrorEvent := func() { + if isPush { + mockAPI.sendDataErrorEvent() + } else { + mockAPI.receiveDataErrorEvent() + } + } maxConsecutiveRestarts := 3 m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: isPush, - MonitorPullChannels: !isPush, AcceptTimeout: time.Hour, - Interval: time.Hour, - ChecksPerInterval: 1, - MinBytesTransferred: 2, MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), + RestartAckTimeout: time.Hour, CompleteTimeout: time.Hour, }) - // Note: Don't start monitor, we'll call checkDataRate() manually - - var chanCtx context.Context + var mch *monitoredChannel if isPush { - mch := m.AddPushChannel(ch1).(*monitoredPushChannel) - chanCtx = mch.ctx + mch = m.AddPushChannel(ch1) mockAPI.dataQueued(10) mockAPI.dataSent(5) } else { - mch := m.AddPullChannel(ch1).(*monitoredPullChannel) - chanCtx = mch.ctx + mch = m.AddPullChannel(ch1) mockAPI.dataReceived(5) } - // Check once to add a data point to the queue. - // Subsequent checks will compare against the previous data point. - m.checkDataRate() - - // Each check should trigger a restart up to the maximum number of restarts + // Each error should trigger a restart up to the maximum number of restarts triggerMaxRestarts := func() { for i := 0; i < maxConsecutiveRestarts; i++ { - m.checkDataRate() + triggerErrorEvent() + + err := mockAPI.awaitRestartSent() + require.NoError(t, err) + + // Simulate receiving restart ack from peer + mockAPI.restartEvent() - err := mockAPI.awaitRestart() + err = awaitRestartComplete(mch) require.NoError(t, err) } } @@ -476,10 +194,10 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { // Reached max restarts, so now there should not be another restart // attempt. // Instead the channel should be closed and the monitor shut down. - m.checkDataRate() - err := mockAPI.awaitRestart() + triggerErrorEvent() + err := mockAPI.awaitRestartSent() require.Error(t, err) // require error because expecting no restart - verifyChannelShutdown(t, chanCtx) + verifyChannelShutdown(t, mch.ctx) }) } @@ -489,6 +207,16 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { runTest("pull", false) } +func awaitRestartComplete(mch *monitoredChannel) error { + for i := 0; i < 10; i++ { + if !mch.isRestarting() { + return nil + } + time.Sleep(time.Millisecond) + } + return xerrors.Errorf("restart did not complete after 10ms") +} + func TestChannelMonitorTimeouts(t *testing.T) { type testCase struct { name string @@ -516,49 +244,30 @@ func TestChannelMonitorTimeouts(t *testing.T) { for _, tc := range testCases { t.Run(name+": "+tc.name, func(t *testing.T) { ch := &mockChannelState{chid: ch1} - mockAPI := newMockMonitorAPI(ch, false) + mockAPI := newMockMonitorAPI(ch, false, false) verifyClosedAndShutdown := func(chCtx context.Context, timeout time.Duration) { - // Verify channel has been closed - select { - case <-time.After(timeout): - require.Fail(t, "failed to close channel within "+timeout.String()) - case <-mockAPI.closed: - } + mockAPI.verifyChannelClosed(t, true) // Verify that channel has been shutdown verifyChannelShutdown(t, chCtx) } - verifyNotClosed := func(timeout time.Duration) { - // Verify channel has not been closed - select { - case <-time.After(timeout): - case <-mockAPI.closed: - require.Fail(t, "expected channel not to have been closed") - } - } - acceptTimeout := 10 * time.Millisecond completeTimeout := 10 * time.Millisecond m := NewMonitor(mockAPI, &Config{ - MonitorPushChannels: isPush, - MonitorPullChannels: !isPush, AcceptTimeout: acceptTimeout, - Interval: time.Hour, - ChecksPerInterval: 1, - MinBytesTransferred: 1, MaxConsecutiveRestarts: 1, + RestartAckTimeout: time.Hour, CompleteTimeout: completeTimeout, }) - m.Start() var chCtx context.Context if isPush { - mch := m.AddPushChannel(ch1).(*monitoredPushChannel) + mch := m.AddPushChannel(ch1) chCtx = mch.ctx } else { - mch := m.AddPullChannel(ch1).(*monitoredPullChannel) + mch := m.AddPullChannel(ch1) chCtx = mch.ctx } @@ -575,7 +284,7 @@ func TestChannelMonitorTimeouts(t *testing.T) { // If we're not expecting the test to have a timeout waiting for // the accept event, verify that channel was not closed - verifyNotClosed(2 * acceptTimeout) + mockAPI.verifyChannelNotClosed(t, 2*acceptTimeout) // Fire the FinishTransfer event mockAPI.finishTransfer() @@ -594,7 +303,7 @@ func TestChannelMonitorTimeouts(t *testing.T) { // If we're not expecting the test to have a timeout waiting for // the accept event, verify that channel was not closed - verifyNotClosed(2 * completeTimeout) + mockAPI.verifyChannelNotClosed(t, 2*completeTimeout) }) } } @@ -614,27 +323,38 @@ func verifyChannelShutdown(t *testing.T, shutdownCtx context.Context) { } type mockMonitorAPI struct { - ch *mockChannelState - restartErrors chan error - restarts chan struct{} - closed chan struct{} + ch *mockChannelState + connectErrors chan error + restartErrors chan error + restartMessages chan struct{} + closeErr chan error - lk sync.Mutex - subscriber datatransfer.Subscriber + lk sync.Mutex + subscribers map[int]datatransfer.Subscriber } -func newMockMonitorAPI(ch *mockChannelState, errOnRestart bool) *mockMonitorAPI { +func newMockMonitorAPI(ch *mockChannelState, errOnReconnect, errOnRestart bool) *mockMonitorAPI { m := &mockMonitorAPI{ - ch: ch, - restarts: make(chan struct{}, 1), - closed: make(chan struct{}), - restartErrors: make(chan error, 1), + ch: ch, + restartMessages: make(chan struct{}, 1), + closeErr: make(chan error, 1), + connectErrors: make(chan error, 1), + restartErrors: make(chan error, 1), + subscribers: make(map[int]datatransfer.Subscriber), + } + + var connectErr error + if errOnReconnect { + connectErr = xerrors.Errorf("connect err") } + m.connectErrors <- connectErr + var restartErr error if errOnRestart { restartErr = xerrors.Errorf("restart err") } m.restartErrors <- restartErr + return m } @@ -642,23 +362,39 @@ func (m *mockMonitorAPI) SubscribeToEvents(subscriber datatransfer.Subscriber) d m.lk.Lock() defer m.lk.Unlock() - m.subscriber = subscriber + idx := len(m.subscribers) + m.subscribers[idx] = subscriber return func() { m.lk.Lock() defer m.lk.Unlock() - m.subscriber = nil + delete(m.subscribers, idx) + } +} + +func (m *mockMonitorAPI) fireEvent(e datatransfer.Event, state datatransfer.ChannelState) { + for _, subscriber := range m.subscribers { + subscriber(e, state) } } -func (m *mockMonitorAPI) callSubscriber(e datatransfer.Event, state datatransfer.ChannelState) { - m.subscriber(e, state) +func (m *mockMonitorAPI) ConnectTo(ctx context.Context, id peer.ID) error { + select { + case err := <-m.connectErrors: + return err + default: + return nil + } +} + +func (m *mockMonitorAPI) PeerID() peer.ID { + return "p" } func (m *mockMonitorAPI) RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { defer func() { - m.restarts <- struct{}{} + m.restartMessages <- struct{}{} }() select { @@ -669,50 +405,82 @@ func (m *mockMonitorAPI) RestartDataTransferChannel(ctx context.Context, chid da } } -func (m *mockMonitorAPI) awaitRestart() error { +func (m *mockMonitorAPI) awaitRestartSent() error { select { case <-time.After(10 * time.Millisecond): return xerrors.Errorf("failed to restart channel") - case <-m.restarts: + case <-m.restartMessages: return nil } } func (m *mockMonitorAPI) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error { - close(m.closed) + m.closeErr <- cherr return nil } +func (m *mockMonitorAPI) verifyChannelClosed(t *testing.T, expectErr bool) { + // Verify channel has been closed + select { + case <-time.After(100 * time.Millisecond): + require.Fail(t, "failed to close channel within 100ms") + case err := <-m.closeErr: + if expectErr && err == nil { + require.Fail(t, "expected error on close") + } + if !expectErr && err != nil { + require.Fail(t, fmt.Sprintf("got error on close: %s", err)) + } + } +} + +func (m *mockMonitorAPI) verifyChannelNotClosed(t *testing.T, timeout time.Duration) { + // Verify channel has not been closed + select { + case <-time.After(timeout): + case <-m.closeErr: + require.Fail(t, "expected channel not to have been closed") + } +} + func (m *mockMonitorAPI) accept() { - m.callSubscriber(datatransfer.Event{Code: datatransfer.Accept}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.Accept}, m.ch) } func (m *mockMonitorAPI) dataQueued(n uint64) { m.ch.queued = n - m.callSubscriber(datatransfer.Event{Code: datatransfer.DataQueued}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.DataQueued}, m.ch) } func (m *mockMonitorAPI) dataSent(n uint64) { m.ch.sent = n - m.callSubscriber(datatransfer.Event{Code: datatransfer.DataSent}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.DataSent}, m.ch) } func (m *mockMonitorAPI) dataReceived(n uint64) { m.ch.received = n - m.callSubscriber(datatransfer.Event{Code: datatransfer.DataReceived}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.DataReceived}, m.ch) } func (m *mockMonitorAPI) finishTransfer() { - m.callSubscriber(datatransfer.Event{Code: datatransfer.FinishTransfer}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.FinishTransfer}, m.ch) } func (m *mockMonitorAPI) completed() { m.ch.complete = true - m.callSubscriber(datatransfer.Event{Code: datatransfer.Complete}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.Complete}, m.ch) } func (m *mockMonitorAPI) sendDataErrorEvent() { - m.callSubscriber(datatransfer.Event{Code: datatransfer.SendDataError}, m.ch) + m.fireEvent(datatransfer.Event{Code: datatransfer.SendDataError}, m.ch) +} + +func (m *mockMonitorAPI) receiveDataErrorEvent() { + m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch) +} + +func (m *mockMonitorAPI) restartEvent() { + m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch) } type mockChannelState struct { diff --git a/channels/channels.go b/channels/channels.go index 81049b1b..06bdc223 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -346,6 +346,12 @@ func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error { return c.send(chid, datatransfer.SendDataError, err) } +// ReceiveDataError indicates that the transport layer had an error receiving +// data from the remote peer +func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) error { + return c.send(chid, datatransfer.ReceiveDataError, err) +} + // HasChannel returns true if the given channel id is being tracked func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) { return c.stateMachines.Has(chid) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 00a52f0f..79711ebc 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -83,6 +83,11 @@ var ChannelEvents = fsm.Events{ chst.AddLog("data transfer send error: %s", chst.Message) return nil }), + fsm.Event(datatransfer.ReceiveDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { + chst.Message = err.Error() + chst.AddLog("data transfer receive error: %s", chst.Message) + return nil + }), fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer request timed out: %s", chst.Message) diff --git a/events.go b/events.go index a71bf5a6..5ec62fd4 100644 --- a/events.go +++ b/events.go @@ -98,6 +98,10 @@ const ( // SendDataError indicates that the transport layer had an error trying // to send data to the remote peer SendDataError + + // ReceiveDataError indicates that the transport layer had an error + // receiving data from the remote peer + ReceiveDataError ) // Events are human readable names for data transfer events @@ -127,6 +131,9 @@ var Events = map[EventCode]string{ DataQueuedProgress: "DataQueuedProgress", DataSentProgress: "DataSentProgress", DataReceivedProgress: "DataReceivedProgress", + RequestTimedOut: "RequestTimedOut", + SendDataError: "SendDataError", + ReceiveDataError: "ReceiveDataError", } // Event is a struct containing information about a data transfer event diff --git a/go.mod b/go.mod index b3a7053b..5f587e0c 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.3 - github.com/ipfs/go-graphsync v0.6.0 + github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 41e2f323..da32ec7c 100644 --- a/go.sum +++ b/go.sum @@ -213,8 +213,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0= -github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 h1:JMQvhEKMk8kz31F7GcQba4XCwrO35zXad0/pmhyxfwk= +github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= diff --git a/impl/events.go b/impl/events.go index ae2b6cd2..ffcd61d2 100644 --- a/impl/events.go +++ b/impl/events.go @@ -227,6 +227,11 @@ func (m *manager) OnSendDataError(chid datatransfer.ChannelID, err error) error return m.channels.SendDataError(chid, err) } +func (m *manager) OnReceiveDataError(chid datatransfer.ChannelID, err error) error { + log.Warnf("channel %+v had transport receive error: %s", chid, err) + return m.channels.ReceiveDataError(chid, err) +} + // OnChannelCompleted is called // - by the requester when all data for a transfer has been received // - by the responder when all data for a transfer has been sent diff --git a/impl/impl.go b/impl/impl.go index 8490f11e..733b02f9 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -116,10 +116,9 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw option(m) } - // Start push / pull channel monitor after applying config options as the config + // Create push / pull channel monitor after applying config options as the config // options may apply to the monitor m.channelMonitor = channelmonitor.NewMonitor(m, m.channelMonitorCfg) - m.channelMonitor.Start() return m, nil } @@ -320,6 +319,12 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe return nil } +// ConnectTo opens a connection to a peer on the data-transfer protocol, +// retrying if necessary +func (m *manager) ConnectTo(ctx context.Context, p peer.ID) error { + return m.dataTransferNetwork.OpenStreamTo(ctx, p) +} + // close an open channel and fire an error event func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error { log.Infof("close channel %s with error %s", chid, cherr) @@ -511,3 +516,7 @@ func (m *manager) channelDataTransferType(channel datatransfer.ChannelState) Cha // we received a push channel return ManagerPeerReceivePush } + +func (m *manager) PeerID() peer.ID { + return m.peerID +} diff --git a/impl/integration_test.go b/impl/integration_test.go index ae717edb..339ffb9e 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -495,7 +495,7 @@ func TestManyReceiversAtOnce(t *testing.T) { } } -// disconnectCoordinator is used by TestPushRequestAutoRestart to allow +// disconnectCoordinator is used by TestAutoRestart to allow // test cases to signal when a disconnect should start, and whether // to wait for the disconnect to take effect before continuing type disconnectCoordinator struct { @@ -724,14 +724,10 @@ func TestAutoRestart(t *testing.T) { // Set up restartConf := ChannelRestartConfig(channelmonitor.Config{ - MonitorPushChannels: tc.isPush, - MonitorPullChannels: !tc.isPush, AcceptTimeout: 100 * time.Millisecond, - Interval: 100 * time.Millisecond, - MinBytesTransferred: 1, - ChecksPerInterval: 10, RestartBackoff: 500 * time.Millisecond, MaxConsecutiveRestarts: 5, + RestartAckTimeout: 100 * time.Millisecond, CompleteTimeout: 100 * time.Millisecond, }) initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf) diff --git a/network/interface.go b/network/interface.go index d6e2ffc9..c830a397 100644 --- a/network/interface.go +++ b/network/interface.go @@ -26,6 +26,11 @@ type DataTransferNetwork interface { // ConnectTo establishes a connection to the given peer ConnectTo(context.Context, peer.ID) error + // OpenStreamTo establishes a connection to the given peer, retrying if + // necessary, and opens a stream on the data-transfer protocol to verify + // the peer will accept messages on the protocol + OpenStreamTo(ctx context.Context, p peer.ID) error + // ID returns the peer id of this libp2p host ID() peer.ID } diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 218dcf1a..68498ab5 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -187,6 +187,18 @@ func (dtnet *libp2pDataTransferNetwork) ConnectTo(ctx context.Context, p peer.ID return dtnet.host.Connect(ctx, peer.AddrInfo{ID: p}) } +// OpenStreamTo establishes a connection to the given peer, retrying if +// necessary, and opens a stream on the data-transfer protocol to verify +// the peer will accept messages on the protocol +func (dtnet *libp2pDataTransferNetwork) OpenStreamTo(ctx context.Context, p peer.ID) error { + s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...) + if err != nil { + return err + } + + return s.Close() +} + // handleNewStream receives a new stream from the network. func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) { defer s.Close() // nolint: errcheck,gosec diff --git a/testutil/testnet.go b/testutil/testnet.go index 0548542c..743af114 100644 --- a/testutil/testnet.go +++ b/testutil/testnet.go @@ -45,6 +45,10 @@ func (fn *FakeNetwork) ConnectTo(_ context.Context, _ peer.ID) error { panic("not implemented") } +func (fn *FakeNetwork) OpenStreamTo(ctx context.Context, p peer.ID) error { + panic("implement me") +} + // ID returns a stubbed id for host of this network func (fn *FakeNetwork) ID() peer.ID { return fn.PeerID diff --git a/transport.go b/transport.go index e271e7be..275b7ab5 100644 --- a/transport.go +++ b/transport.go @@ -63,6 +63,10 @@ type EventsHandler interface { // OnSendDataError is called when a network error occurs sending data // at the transport layer OnSendDataError(chid ChannelID, err error) error + + // OnReceiveDataError is called when a network error occurs receiving data + // at the transport layer + OnReceiveDataError(chid ChannelID, err error) error } /* diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index eb3a123e..24a214f4 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -410,6 +410,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error { t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook)) t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener)) t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterNetworkErrorListener(t.gsNetworkSendErrorListener)) + t.unregisterFuncs = append(t.unregisterFuncs, t.gs.RegisterReceiverNetworkErrorListener(t.gsNetworkReceiveErrorListener)) return nil } @@ -809,9 +810,13 @@ func (t *Transport) gsNetworkSendErrorListener(p peer.ID, request graphsync.Requ t.dataLock.Lock() defer t.dataLock.Unlock() + // Fire an error if the graphsync request was made by this node or the remote peer chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}] if !ok { - return + chid, ok = t.graphsyncRequestMap[graphsyncKey{request.ID(), t.peerID}] + if !ok { + return + } } err := t.events.OnSendDataError(chid, gserr) @@ -819,3 +824,22 @@ func (t *Transport) gsNetworkSendErrorListener(p peer.ID, request graphsync.Requ log.Errorf("failed to fire transport send error %s: %s", gserr, err) } } + +// Called when there is a graphsync error receiving data +func (t *Transport) gsNetworkReceiveErrorListener(p peer.ID, gserr error) { + t.dataLock.Lock() + defer t.dataLock.Unlock() + + // Fire a receive data error on all ongoing graphsync transfers with that + // peer + for _, chid := range t.graphsyncRequestMap { + if chid.Initiator != p && chid.Responder != p { + continue + } + + err := t.events.OnReceiveDataError(chid, gserr) + if err != nil { + log.Errorf("failed to fire transport receive error %s: %s", gserr, err) + } + } +} diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 512ab78c..a5251596 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -331,7 +331,7 @@ func TestManager(t *testing.T) { require.False(t, events.OnDataQueuedCalled) }, }, - "outgoing data send error will terminate request": { + "outgoing data queued error will terminate request": { events: fakeEvents{ OnDataQueuedError: errors.New("something went wrong"), }, @@ -345,7 +345,7 @@ func TestManager(t *testing.T) { require.Error(t, gsData.outgoingBlockHookActions.TerminationError) }, }, - "outgoing data send error == pause will pause request": { + "outgoing data queued error == pause will pause request": { events: fakeEvents{ OnDataQueuedError: datatransfer.ErrPause, }, @@ -626,7 +626,7 @@ func TestManager(t *testing.T) { assertHasOutgoingMessage(t, gsData.incomingRequestHookActions.SentExtensions, gsData.incoming) }, }, - "recognized incoming request will record network error": { + "recognized incoming request will record network send error": { action: func(gsData *harness) { gsData.incomingRequestHook() gsData.networkErrorListener(errors.New("something went wrong")) @@ -636,6 +636,34 @@ func TestManager(t *testing.T) { require.True(t, events.OnSendDataErrorCalled) }, }, + "recognized outgoing request will record network send error": { + action: func(gsData *harness) { + gsData.outgoingRequestHook() + gsData.networkErrorListener(errors.New("something went wrong")) + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + require.True(t, events.OnSendDataErrorCalled) + }, + }, + "recognized incoming request will record network receive error": { + action: func(gsData *harness) { + gsData.incomingRequestHook() + gsData.receiverNetworkErrorListener(errors.New("something went wrong")) + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + require.Equal(t, 1, events.OnRequestReceivedCallCount) + require.True(t, events.OnReceiveDataErrorCalled) + }, + }, + "recognized outgoing request will record network receive error": { + action: func(gsData *harness) { + gsData.outgoingRequestHook() + gsData.receiverNetworkErrorListener(errors.New("something went wrong")) + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + require.True(t, events.OnReceiveDataErrorCalled) + }, + }, "open channel adds doNotSendCids to the DoNotSend extension": { action: func(gsData *harness) { cids := testutil.GenerateCids(2) @@ -974,10 +1002,12 @@ type fakeEvents struct { OnDataQueuedMessage datatransfer.Message OnDataQueuedError error - OnRequestTimedOutCalled bool - OnRequestTimedOutChannelId datatransfer.ChannelID - OnSendDataErrorCalled bool - OnSendDataErrorChannelID datatransfer.ChannelID + OnRequestTimedOutCalled bool + OnRequestTimedOutChannelId datatransfer.ChannelID + OnSendDataErrorCalled bool + OnSendDataErrorChannelID datatransfer.ChannelID + OnReceiveDataErrorCalled bool + OnReceiveDataErrorChannelID datatransfer.ChannelID ChannelCompletedSuccess bool RequestReceivedRequest datatransfer.Request @@ -1008,6 +1038,12 @@ func (fe *fakeEvents) OnSendDataError(chid datatransfer.ChannelID, err error) er return nil } +func (fe *fakeEvents) OnReceiveDataError(chid datatransfer.ChannelID, err error) error { + fe.OnReceiveDataErrorCalled = true + fe.OnReceiveDataErrorChannelID = chid + return nil +} + func (fe *fakeEvents) OnChannelOpened(chid datatransfer.ChannelID) error { fe.ChannelOpenedChannelID = chid return fe.OnChannelOpenedError @@ -1099,6 +1135,9 @@ func (ha *harness) requestorCancelledListener() { func (ha *harness) networkErrorListener(err error) { ha.fgs.NetworkErrorListener(ha.other, ha.request, err) } +func (ha *harness) receiverNetworkErrorListener(err error) { + ha.fgs.ReceiverNetworkErrorListener(ha.other, err) +} type dtConfig struct { dtExtensionMissing bool From 10de9cbaaeeb15f91b44491c5f206fb890babb11 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 9 Apr 2021 13:23:00 +0200 Subject: [PATCH 2/4] refactor: better method name --- impl/impl.go | 2 +- network/interface.go | 4 ++-- network/libp2p_impl.go | 8 ++++++-- testutil/testnet.go | 2 +- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/impl/impl.go b/impl/impl.go index 733b02f9..b0210f04 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -322,7 +322,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe // ConnectTo opens a connection to a peer on the data-transfer protocol, // retrying if necessary func (m *manager) ConnectTo(ctx context.Context, p peer.ID) error { - return m.dataTransferNetwork.OpenStreamTo(ctx, p) + return m.dataTransferNetwork.ConnectWithRetry(ctx, p) } // close an open channel and fire an error event diff --git a/network/interface.go b/network/interface.go index c830a397..97f0a241 100644 --- a/network/interface.go +++ b/network/interface.go @@ -26,10 +26,10 @@ type DataTransferNetwork interface { // ConnectTo establishes a connection to the given peer ConnectTo(context.Context, peer.ID) error - // OpenStreamTo establishes a connection to the given peer, retrying if + // ConnectWithRetry establishes a connection to the given peer, retrying if // necessary, and opens a stream on the data-transfer protocol to verify // the peer will accept messages on the protocol - OpenStreamTo(ctx context.Context, p peer.ID) error + ConnectWithRetry(ctx context.Context, p peer.ID) error // ID returns the peer id of this libp2p host ID() peer.ID diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 68498ab5..c9b13b2b 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -187,15 +187,19 @@ func (dtnet *libp2pDataTransferNetwork) ConnectTo(ctx context.Context, p peer.ID return dtnet.host.Connect(ctx, peer.AddrInfo{ID: p}) } -// OpenStreamTo establishes a connection to the given peer, retrying if +// ConnectWithRetry establishes a connection to the given peer, retrying if // necessary, and opens a stream on the data-transfer protocol to verify // the peer will accept messages on the protocol -func (dtnet *libp2pDataTransferNetwork) OpenStreamTo(ctx context.Context, p peer.ID) error { +func (dtnet *libp2pDataTransferNetwork) ConnectWithRetry(ctx context.Context, p peer.ID) error { + // Open a stream over the data-transfer protocol, to make sure that the + // peer is listening on the protocol s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...) if err != nil { return err } + // We don't actually use the stream, we just open it to verify it's + // possible to connect over the data-transfer protocol, so we close it here return s.Close() } diff --git a/testutil/testnet.go b/testutil/testnet.go index 743af114..1d1360b1 100644 --- a/testutil/testnet.go +++ b/testutil/testnet.go @@ -45,7 +45,7 @@ func (fn *FakeNetwork) ConnectTo(_ context.Context, _ peer.ID) error { panic("not implemented") } -func (fn *FakeNetwork) OpenStreamTo(ctx context.Context, p peer.ID) error { +func (fn *FakeNetwork) ConnectWithRetry(ctx context.Context, p peer.ID) error { panic("implement me") } From 6798477081354df23ae2410c6fb1588f210d64ea Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 16 Apr 2021 15:28:10 +0200 Subject: [PATCH 3/4] feat: add debounce to channel monitor --- channelmonitor/channelmonitor.go | 124 ++++++++++++++++++-------- channelmonitor/channelmonitor_test.go | 121 ++++++++++++++++--------- go.mod | 3 +- go.sum | 6 +- 4 files changed, 174 insertions(+), 80 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index cbba90f8..e58a9892 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/bep/debounce" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -39,6 +40,8 @@ type Monitor struct { type Config struct { // Max time to wait for other side to accept open channel request before attempting restart AcceptTimeout time.Duration + // Debounce when restart is triggered by multiple errors + RestartDebounce time.Duration // Backoff after restarting RestartBackoff time.Duration // Number of times to try to restart before failing @@ -152,18 +155,20 @@ func (m *Monitor) enabled() bool { type monitoredChannel struct { // The parentCtx is used when sending a close message for a channel, so // that operation can continue even after the monitoredChannel is shutdown - parentCtx context.Context - ctx context.Context - cancel context.CancelFunc - mgr monitorAPI - chid datatransfer.ChannelID - cfg *Config - unsub datatransfer.Unsubscribe - onShutdown func(datatransfer.ChannelID) - shutdownLk sync.Mutex + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + mgr monitorAPI + chid datatransfer.ChannelID + cfg *Config + unsub datatransfer.Unsubscribe + restartChannelDebounced func() + onShutdown func(datatransfer.ChannelID) + shutdownLk sync.Mutex restartLk sync.RWMutex restartedAt time.Time + restartQueued bool consecutiveRestarts int } @@ -184,6 +189,8 @@ func newMonitoredChannel( cfg: cfg, onShutdown: onShutdown, } + debouncer := debounce.New(cfg.RestartDebounce) + mpc.restartChannelDebounced = func() { debouncer(mpc.restartChannel) } mpc.start() return mpc } @@ -243,12 +250,12 @@ func (mc *monitoredChannel) start() { // If the transport layer reports an error sending data over the wire, // attempt to restart the channel log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid) - go mc.restartChannel() + go mc.restartChannelDebounced() case datatransfer.ReceiveDataError: // If the transport layer reports an error receiving data over the wire, // attempt to restart the channel log.Warnf("%s: data transfer transport receive error, restarting data transfer", mc.chid) - go mc.restartChannel() + go mc.restartChannelDebounced() case datatransfer.FinishTransfer: // The channel initiator has finished sending / receiving all data. // Watch to make sure that the responder sends a message to acknowledge @@ -297,7 +304,7 @@ func (mc *monitoredChannel) watchForResponderComplete() { // When the Complete message is received, the channel shuts down and // its context is cancelled case <-timer.C: - // Timer expired before we received a Complete from the responder + // Timer expired before we received a Complete message from the responder err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer", mc.chid, mc.cfg.AcceptTimeout) mc.closeChannelAndShutdown(err) @@ -321,52 +328,94 @@ func (mc *monitoredChannel) isRestarting() bool { return !mc.restartedAt.IsZero() } +// Send a restart message for the channel func (mc *monitoredChannel) restartChannel() { - var restartCount int var restartedAt time.Time mc.restartLk.Lock() { - // If the channel is not already being restarted, record the restart - // time and increment the consecutive restart count restartedAt = mc.restartedAt if mc.restartedAt.IsZero() { + // If there is not already a restart in progress, we'll restart now mc.restartedAt = time.Now() - mc.consecutiveRestarts++ - restartCount = mc.consecutiveRestarts + } else { + // There is already a restart in progress, so queue up a restart + // for after the current one has completed + mc.restartQueued = true } } mc.restartLk.Unlock() // Check if channel is already being restarted if !restartedAt.IsZero() { - log.Debugf("%s: restart called but already restarting channel (for %s so far; restart backoff is %s)", + log.Infof("%s: restart called but already restarting channel, "+ + "waiting to restart again (since %s; restart backoff is %s)", mc.chid, time.Since(restartedAt), mc.cfg.RestartBackoff) return } + for { + // Send the restart message + err := mc.doRestartChannel() + if err != nil { + // If there was an error restarting, close the channel and shutdown + // the monitor + mc.closeChannelAndShutdown(err) + return + } + + // Restart has completed, check if there's another restart queued up + restartAgain := false + mc.restartLk.Lock() + { + if mc.restartQueued { + // There's another restart queued up, so reset the restart time + // to now + mc.restartedAt = time.Now() + restartAgain = true + mc.restartQueued = false + } else { + // No other restarts queued up, so clear the restart time + mc.restartedAt = time.Time{} + } + } + mc.restartLk.Unlock() + + if !restartAgain { + // No restart queued, we're done + return + } + + // There was a restart queued, restart again + log.Infof("%s: restart was queued - restarting again", mc.chid) + } +} + +func (mc *monitoredChannel) doRestartChannel() error { + // Keep track of the number of consecutive restarts with no data + // transferred + mc.restartLk.Lock() + mc.consecutiveRestarts++ + restartCount := mc.consecutiveRestarts + mc.restartLk.Unlock() + if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts { - // If no data has been transferred since the last transfer, and we've - // reached the consecutive restart limit, close the channel and - // shutdown the monitor - err := xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount) - mc.closeChannelAndShutdown(err) - return + // If no data has been transferred since the last restart, and we've + // reached the consecutive restart limit, return an error + return xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount) } // Send the restart message + log.Infof("%s: restarting (%d consecutive restarts)", mc.chid, restartCount) err := mc.sendRestartMessage(restartCount) if err != nil { - // If the restart message could not be sent, close the channel and - // shutdown the monitor - mc.closeChannelAndShutdown(err) - return + log.Warnf("%s: restart failed, trying again: %s", mc.chid, err) + // If the restart message could not be sent, or there was a timeout + // waiting for the restart to be acknowledged, try again + return mc.doRestartChannel() } + log.Infof("%s: restart completed successfully", mc.chid) - // Restart complete, so clear the restart time so that another restart - // can begin - mc.restartLk.Lock() - mc.restartedAt = time.Time{} - mc.restartLk.Unlock() + return nil } func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { @@ -383,7 +432,7 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { } log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) - // Send a restart message for the channel. + // Send a restart message for the channel restartResult := mc.waitForRestartResponse() log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) @@ -430,7 +479,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { } // Close the data transfer channel and fire an error - log.Errorf("closing data-transfer channel: %s", cherr) + log.Errorf("%s: closing data-transfer channel: %s", mc.chid, cherr) err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr) if err != nil { log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err) @@ -440,7 +489,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { // Wait for the peer to send an acknowledgement to the restart request func (mc *monitoredChannel) waitForRestartResponse() chan error { restartFired := make(chan struct{}) - restarted := make(chan error) + restarted := make(chan error, 3) timer := time.NewTimer(mc.cfg.RestartAckTimeout) unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { @@ -472,8 +521,9 @@ func (mc *monitoredChannel) waitForRestartResponse() chan error { // Timer expired before receiving a restart ack from peer case <-timer.C: p := mc.chid.OtherParty(mc.mgr.PeerID()) - restarted <- xerrors.Errorf("did not receive response to restart request from %s after %s", + err := xerrors.Errorf("did not receive response to restart request from %s after %s", p, mc.cfg.RestartAckTimeout) + restarted <- err } }() diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index ce72ee36..13785ca9 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -59,7 +59,7 @@ func TestChannelMonitorAutoRestart(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: time.Hour, MaxConsecutiveRestarts: 3, - RestartAckTimeout: 50 * time.Millisecond, + RestartAckTimeout: 20 * time.Millisecond, CompleteTimeout: time.Hour, }) @@ -93,11 +93,8 @@ func TestChannelMonitorAutoRestart(t *testing.T) { } // Verify that restart message is sent - select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to restart channel") - case <-mockAPI.restartMessages: - } + err := mockAPI.awaitRestartSent() + require.NoError(t, err) // If simulating a restart ack timeout, don't fire the restart // ack event and expect the channel to be closed with an error @@ -217,6 +214,63 @@ func awaitRestartComplete(mch *monitoredChannel) error { return xerrors.Errorf("restart did not complete after 10ms") } +func TestChannelMonitorQueuedRestart(t *testing.T) { + runTest := func(name string, isPush bool) { + t.Run(name, func(t *testing.T) { + ch := &mockChannelState{chid: ch1} + mockAPI := newMockMonitorAPI(ch, false, false) + + triggerErrorEvent := func() { + if isPush { + mockAPI.sendDataErrorEvent() + } else { + mockAPI.receiveDataErrorEvent() + } + } + + m := NewMonitor(mockAPI, &Config{ + AcceptTimeout: time.Hour, + RestartDebounce: 10 * time.Millisecond, + MaxConsecutiveRestarts: 3, + RestartAckTimeout: time.Hour, + CompleteTimeout: time.Hour, + }) + + if isPush { + m.AddPushChannel(ch1) + + mockAPI.dataQueued(10) + mockAPI.dataSent(5) + } else { + m.AddPullChannel(ch1) + + mockAPI.dataReceived(5) + } + + // Trigger an error event, should cause a restart + triggerErrorEvent() + // Wait for restart to occur + err := mockAPI.awaitRestartSent() + require.NoError(t, err) + + // Trigger another error event before the restart has completed + triggerErrorEvent() + + // Simulate receiving restart ack from peer (for first restart) + mockAPI.restartEvent() + + // A second restart should be sent because of the second error + err = mockAPI.awaitRestartSent() + require.NoError(t, err) + }) + } + + // test push channel + runTest("push", true) + // test pull channel + runTest("pull", false) +} + func TestChannelMonitorTimeouts(t *testing.T) { type testCase struct { name string @@ -324,8 +378,8 @@ func verifyChannelShutdown(t *testing.T, shutdownCtx context.Context) { type mockMonitorAPI struct { ch *mockChannelState - connectErrors chan error - restartErrors chan error + connectErrors bool + restartErrors bool restartMessages chan struct{} closeErr chan error @@ -334,28 +388,14 @@ type mockMonitorAPI struct { } func newMockMonitorAPI(ch *mockChannelState, errOnReconnect, errOnRestart bool) *mockMonitorAPI { - m := &mockMonitorAPI{ + return &mockMonitorAPI{ ch: ch, - restartMessages: make(chan struct{}, 1), + connectErrors: errOnReconnect, + restartErrors: errOnRestart, + restartMessages: make(chan struct{}, 100), closeErr: make(chan error, 1), - connectErrors: make(chan error, 1), - restartErrors: make(chan error, 1), subscribers: make(map[int]datatransfer.Subscriber), } - - var connectErr error - if errOnReconnect { - connectErr = xerrors.Errorf("connect err") - } - m.connectErrors <- connectErr - - var restartErr error - if errOnRestart { - restartErr = xerrors.Errorf("restart err") - } - m.restartErrors <- restartErr - - return m } func (m *mockMonitorAPI) SubscribeToEvents(subscriber datatransfer.Subscriber) datatransfer.Unsubscribe { @@ -374,18 +414,19 @@ func (m *mockMonitorAPI) SubscribeToEvents(subscriber datatransfer.Subscriber) d } func (m *mockMonitorAPI) fireEvent(e datatransfer.Event, state datatransfer.ChannelState) { + m.lk.Lock() + defer m.lk.Unlock() + for _, subscriber := range m.subscribers { subscriber(e, state) } } func (m *mockMonitorAPI) ConnectTo(ctx context.Context, id peer.ID) error { - select { - case err := <-m.connectErrors: - return err - default: - return nil + if m.connectErrors { + return xerrors.Errorf("connect err") } + return nil } func (m *mockMonitorAPI) PeerID() peer.ID { @@ -397,18 +438,17 @@ func (m *mockMonitorAPI) RestartDataTransferChannel(ctx context.Context, chid da m.restartMessages <- struct{}{} }() - select { - case err := <-m.restartErrors: - return err - default: - return nil + if m.restartErrors { + return xerrors.Errorf("restart err") } + return nil } func (m *mockMonitorAPI) awaitRestartSent() error { + timeout := 100 * time.Millisecond select { - case <-time.After(10 * time.Millisecond): - return xerrors.Errorf("failed to restart channel") + case <-time.After(timeout): + return xerrors.Errorf("failed to restart channel after %s", timeout) case <-m.restartMessages: return nil } @@ -421,9 +461,10 @@ func (m *mockMonitorAPI) CloseDataTransferChannelWithError(ctx context.Context, func (m *mockMonitorAPI) verifyChannelClosed(t *testing.T, expectErr bool) { // Verify channel has been closed + closeTimeout := 100 * time.Millisecond select { - case <-time.After(100 * time.Millisecond): - require.Fail(t, "failed to close channel within 100ms") + case <-time.After(closeTimeout): + require.Fail(t, fmt.Sprintf("failed to close channel within %s", closeTimeout)) case err := <-m.closeErr: if expectErr && err == nil { require.Fail(t, "expected error on close") diff --git a/go.mod b/go.mod index 5f587e0c..b0295286 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/filecoin-project/go-data-transfer go 1.14 require ( + github.com/bep/debounce v1.2.0 github.com/filecoin-project/go-ds-versioning v0.1.0 github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 @@ -12,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.3 - github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 + github.com/ipfs/go-graphsync v0.6.1 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index da32ec7c..4dd52060 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= +github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= @@ -213,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957 h1:JMQvhEKMk8kz31F7GcQba4XCwrO35zXad0/pmhyxfwk= -github.com/ipfs/go-graphsync v0.6.1-0.20210407112122-089b2abad957/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE= +github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= From 689c5fe9cc3b84d4a8b6bf78904210547584dd86 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 16 Apr 2021 16:23:57 +0200 Subject: [PATCH 4/4] fix: restart channel size --- channelmonitor/channelmonitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index e58a9892..3a2aa9c1 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -489,7 +489,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { // Wait for the peer to send an acknowledgement to the restart request func (mc *monitoredChannel) waitForRestartResponse() chan error { restartFired := make(chan struct{}) - restarted := make(chan error, 3) + restarted := make(chan error, 1) timer := time.NewTimer(mc.cfg.RestartAckTimeout) unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {