diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index 42365a7916..af3e83a9da 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -6,10 +6,20 @@ import ( "reflect" "sync" "sync/atomic" + "time" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/event" ) +type logInterface interface { + Errorf(string, ...interface{}) +} + +var log logInterface = logging.Logger("eventbus") + +const slowConsumerWarningTimeout = time.Second + // ///////////////////// // BUS @@ -322,6 +332,8 @@ type wildcardNode struct { nSinks atomic.Int32 sinks []*namedSink metricsTracer MetricsTracer + + slowConsumerTimer *time.Timer } func (n *wildcardNode) addSink(sink *namedSink) { @@ -336,6 +348,12 @@ func (n *wildcardNode) addSink(sink *namedSink) { } func (n *wildcardNode) removeSink(ch chan interface{}) { + go func() { + // drain the event channel, will return when closed and drained. + // this is necessary to unblock publishes to this channel. + for range ch { + } + }() n.nSinks.Add(-1) // ok to do outside the lock n.Lock() for i := 0; i < len(n.sinks); i++ { @@ -348,6 +366,8 @@ func (n *wildcardNode) removeSink(ch chan interface{}) { n.Unlock() } +var wildcardType = reflect.TypeOf(event.WildcardSubscription) + func (n *wildcardNode) emit(evt interface{}) { if n.nSinks.Load() == 0 { return @@ -360,7 +380,16 @@ func (n *wildcardNode) emit(evt interface{}) { // record channel full events before blocking sendSubscriberMetrics(n.metricsTracer, sink) - sink.ch <- evt + select { + case sink.ch <- evt: + default: + slowConsumerTimer := emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink) + defer func() { + n.Lock() + n.slowConsumerTimer = slowConsumerTimer + n.Unlock() + }() + } } n.RUnlock() } @@ -379,6 +408,8 @@ type node struct { sinks []*namedSink metricsTracer MetricsTracer + + slowConsumerTimer *time.Timer } func newNode(typ reflect.Type, metricsTracer MetricsTracer) *node { @@ -404,11 +435,37 @@ func (n *node) emit(evt interface{}) { // Sending metrics before sending on channel allows us to // record channel full events before blocking sendSubscriberMetrics(n.metricsTracer, sink) - sink.ch <- evt + select { + case sink.ch <- evt: + default: + n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, n.typ, evt, sink) + } } n.lk.Unlock() } +func emitAndLogError(timer *time.Timer, typ reflect.Type, evt interface{}, sink *namedSink) *time.Timer { + // Slow consumer. Log a warning if stalled for the timeout + if timer == nil { + timer = time.NewTimer(slowConsumerWarningTimeout) + } else { + timer.Reset(slowConsumerWarningTimeout) + } + + select { + case sink.ch <- evt: + if !timer.Stop() { + <-timer.C + } + case <-timer.C: + log.Errorf("subscriber named \"%s\" is a slow consumer of %s. This can lead to libp2p stalling and hard to debug issues.", sink.name, typ) + // Continue to stall since there's nothing else we can do. + sink.ch <- evt + } + + return timer +} + func sendSubscriberMetrics(metricsTracer MetricsTracer, sink *namedSink) { if metricsTracer != nil { metricsTracer.SubscriberQueueLength(sink.name, len(sink.ch)+1) diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index 57362ce9b7..15768cda5c 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "strings" "sync" "sync/atomic" "testing" @@ -13,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p-testing/race" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -131,6 +133,85 @@ func TestEmitNoSubNoBlock(t *testing.T) { em.Emit(EventA{}) } +type mockLogger struct { + mu sync.Mutex + logs []string +} + +func (m *mockLogger) Errorf(format string, args ...interface{}) { + m.mu.Lock() + defer m.mu.Unlock() + m.logs = append(m.logs, fmt.Sprintf(format, args...)) +} + +func (m *mockLogger) Logs() []string { + m.mu.Lock() + defer m.mu.Unlock() + return m.logs +} + +func (m *mockLogger) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + m.logs = nil +} + +func TestEmitLogsErrorOnStall(t *testing.T) { + oldLogger := log + defer func() { + log = oldLogger + }() + ml := &mockLogger{} + log = ml + + bus1 := NewBus() + bus2 := NewBus() + + eventSub, err := bus1.Subscribe(new(EventA)) + if err != nil { + t.Fatal(err) + } + + wildcardSub, err := bus2.Subscribe(event.WildcardSubscription) + if err != nil { + t.Fatal(err) + } + + testCases := []event.Subscription{eventSub, wildcardSub} + eventBuses := []event.Bus{bus1, bus2} + + for i, sub := range testCases { + bus := eventBuses[i] + em, err := bus.Emitter(new(EventA)) + if err != nil { + t.Fatal(err) + } + defer em.Close() + + go func() { + for i := 0; i < subSettingsDefault.buffer+2; i++ { + em.Emit(EventA{}) + } + }() + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + logs := ml.Logs() + found := false + for _, log := range logs { + if strings.Contains(log, "slow consumer") { + found = true + break + } + } + assert.True(collect, found, "expected to find slow consumer log") + }, 3*time.Second, 500*time.Millisecond) + ml.Clear() + + // Close the subscriber so the worker can finish. + sub.Close() + } +} + func TestEmitOnClosed(t *testing.T) { bus := NewBus()