diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index fc053eb..e0a1623 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -70,7 +70,6 @@ type RelayConnection struct { eventsToSendMutex sync.Mutex newEventsCh chan domain.Event rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager - cancelRun context.CancelFunc cancelBackPressure context.CancelFunc } @@ -91,7 +90,6 @@ func NewRelayConnection( eventsToSend: make(map[domain.EventId]*eventToSend), newEventsCh: make(chan domain.Event), rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager, - cancelRun: nil, cancelBackPressure: nil, } } @@ -311,8 +309,6 @@ func (r *RelayConnection) triggerSubscriptionUpdate() { func (r *RelayConnection) run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) - backPressureCtx, cancelFromBackPressure := context.WithCancel(ctx) - r.cancelRun = cancelFromBackPressure defer cancel() defer r.setState(RelayConnectionStateDisconnected) @@ -359,17 +355,9 @@ func (r *RelayConnection) run(ctx context.Context) error { return NewReadMessageError(err) } - select { - case <-backPressureCtx.Done(): - // The backpressure handling code is to avoid overwhelming the queue - // that writes to relay.nos.social so we skip that signal for it or - // the queue will never shrink - if r.Address().HostWithoutPort() == "relay.nos.social" { - continue - } - - return BackPressureError - default: + if r.state == RelayConnectionStateBackPressured { + // Load shedding under backpressure + continue } if err := r.handleMessage(messageBytes); err != nil { @@ -378,7 +366,6 @@ func (r *RelayConnection) run(ctx context.Context) error { WithError(err). WithField("message", string(messageBytes)). Message("error handling an incoming message") - continue } } } diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index 13dea88..84423ad 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -76,9 +76,8 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re func (d *RelayConnections) NotifyBackPressure() { for _, connection := range d.connections { - if connection.cancelRun != nil && connection.Address().HostWithoutPort() != "relay.nos.social" { - connection.cancelRun() - connection.cancelRun = nil + if connection.Address().HostWithoutPort() != "relay.nos.social" { + connection.setState(RelayConnectionStateBackPressured) } } }