Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel monitor watches for errors instead of measuring data rate #190

Merged
merged 4 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 87 additions & 37 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/bep/debounce"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -39,6 +40,8 @@ type Monitor struct {
type Config struct {
// Max time to wait for other side to accept open channel request before attempting restart
AcceptTimeout time.Duration
// Debounce when restart is triggered by multiple errors
RestartDebounce time.Duration
// Backoff after restarting
RestartBackoff time.Duration
// Number of times to try to restart before failing
Expand Down Expand Up @@ -152,18 +155,20 @@ func (m *Monitor) enabled() bool {
type monitoredChannel struct {
// The parentCtx is used when sending a close message for a channel, so
// that operation can continue even after the monitoredChannel is shutdown
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
chid datatransfer.ChannelID
cfg *Config
unsub datatransfer.Unsubscribe
onShutdown func(datatransfer.ChannelID)
shutdownLk sync.Mutex
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
chid datatransfer.ChannelID
cfg *Config
unsub datatransfer.Unsubscribe
restartChannelDebounced func()
onShutdown func(datatransfer.ChannelID)
shutdownLk sync.Mutex

restartLk sync.RWMutex
restartedAt time.Time
restartQueued bool
consecutiveRestarts int
}

Expand All @@ -184,6 +189,8 @@ func newMonitoredChannel(
cfg: cfg,
onShutdown: onShutdown,
}
debouncer := debounce.New(cfg.RestartDebounce)
mpc.restartChannelDebounced = func() { debouncer(mpc.restartChannel) }
mpc.start()
return mpc
}
Expand Down Expand Up @@ -243,12 +250,12 @@ func (mc *monitoredChannel) start() {
// If the transport layer reports an error sending data over the wire,
// attempt to restart the channel
log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannel()
go mc.restartChannelDebounced()
case datatransfer.ReceiveDataError:
// If the transport layer reports an error receiving data over the wire,
// attempt to restart the channel
log.Warnf("%s: data transfer transport receive error, restarting data transfer", mc.chid)
go mc.restartChannel()
go mc.restartChannelDebounced()
case datatransfer.FinishTransfer:
// The channel initiator has finished sending / receiving all data.
// Watch to make sure that the responder sends a message to acknowledge
Expand Down Expand Up @@ -297,7 +304,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
// When the Complete message is received, the channel shuts down and
// its context is cancelled
case <-timer.C:
// Timer expired before we received a Complete from the responder
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.closeChannelAndShutdown(err)
Expand All @@ -321,52 +328,94 @@ func (mc *monitoredChannel) isRestarting() bool {
return !mc.restartedAt.IsZero()
}

// Send a restart message for the channel
func (mc *monitoredChannel) restartChannel() {
var restartCount int
var restartedAt time.Time
mc.restartLk.Lock()
{
// If the channel is not already being restarted, record the restart
// time and increment the consecutive restart count
restartedAt = mc.restartedAt
if mc.restartedAt.IsZero() {
// If there is not already a restart in progress, we'll restart now
mc.restartedAt = time.Now()
mc.consecutiveRestarts++
restartCount = mc.consecutiveRestarts
} else {
// There is already a restart in progress, so queue up a restart
// for after the current one has completed
mc.restartQueued = true
}
}
mc.restartLk.Unlock()

// Check if channel is already being restarted
if !restartedAt.IsZero() {
log.Debugf("%s: restart called but already restarting channel (for %s so far; restart backoff is %s)",
log.Infof("%s: restart called but already restarting channel, "+
"waiting to restart again (since %s; restart backoff is %s)",
mc.chid, time.Since(restartedAt), mc.cfg.RestartBackoff)
return
}

for {
// Send the restart message
err := mc.doRestartChannel()
if err != nil {
// If there was an error restarting, close the channel and shutdown
// the monitor
mc.closeChannelAndShutdown(err)
return
}

// Restart has completed, check if there's another restart queued up
restartAgain := false
mc.restartLk.Lock()
{
if mc.restartQueued {
// There's another restart queued up, so reset the restart time
// to now
mc.restartedAt = time.Now()
restartAgain = true
mc.restartQueued = false
} else {
// No other restarts queued up, so clear the restart time
mc.restartedAt = time.Time{}
}
}
mc.restartLk.Unlock()

if !restartAgain {
// No restart queued, we're done
return
}

// There was a restart queued, restart again
log.Infof("%s: restart was queued - restarting again", mc.chid)
}
}

func (mc *monitoredChannel) doRestartChannel() error {
// Keep track of the number of consecutive restarts with no data
// transferred
mc.restartLk.Lock()
mc.consecutiveRestarts++
restartCount := mc.consecutiveRestarts
mc.restartLk.Unlock()

if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts {
// If no data has been transferred since the last transfer, and we've
// reached the consecutive restart limit, close the channel and
// shutdown the monitor
err := xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount)
mc.closeChannelAndShutdown(err)
return
// If no data has been transferred since the last restart, and we've
// reached the consecutive restart limit, return an error
return xerrors.Errorf("%s: after %d consecutive restarts failed to transfer any data", mc.chid, restartCount)
}

// Send the restart message
log.Infof("%s: restarting (%d consecutive restarts)", mc.chid, restartCount)
err := mc.sendRestartMessage(restartCount)
if err != nil {
// If the restart message could not be sent, close the channel and
// shutdown the monitor
mc.closeChannelAndShutdown(err)
return
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
// If the restart message could not be sent, or there was a timeout
// waiting for the restart to be acknowledged, try again
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return mc.doRestartChannel()
}
log.Infof("%s: restart completed successfully", mc.chid)

// Restart complete, so clear the restart time so that another restart
// can begin
mc.restartLk.Lock()
mc.restartedAt = time.Time{}
mc.restartLk.Unlock()
return nil
}

func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
Expand All @@ -383,7 +432,7 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
}
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel.
// Send a restart message for the channel
restartResult := mc.waitForRestartResponse()
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
Expand Down Expand Up @@ -430,7 +479,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
}

// Close the data transfer channel and fire an error
log.Errorf("closing data-transfer channel: %s", cherr)
log.Errorf("%s: closing data-transfer channel: %s", mc.chid, cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr)
if err != nil {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
Expand All @@ -440,7 +489,7 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
// Wait for the peer to send an acknowledgement to the restart request
func (mc *monitoredChannel) waitForRestartResponse() chan error {
restartFired := make(chan struct{})
restarted := make(chan error)
restarted := make(chan error, 3)
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
timer := time.NewTimer(mc.cfg.RestartAckTimeout)

unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
Expand Down Expand Up @@ -472,8 +521,9 @@ func (mc *monitoredChannel) waitForRestartResponse() chan error {
// Timer expired before receiving a restart ack from peer
case <-timer.C:
p := mc.chid.OtherParty(mc.mgr.PeerID())
restarted <- xerrors.Errorf("did not receive response to restart request from %s after %s",
err := xerrors.Errorf("did not receive response to restart request from %s after %s",
p, mc.cfg.RestartAckTimeout)
restarted <- err
}
}()

Expand Down
Loading