From 8028e79027ae81ec76c226683142570f1b570987 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Mon, 20 May 2024 13:29:14 -0300 Subject: [PATCH 1/2] Load shedding instead of disconnection under backpressure --- service/domain/relays/relay_connection.go | 18 +++--------------- service/domain/relays/relay_connections.go | 5 ++--- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index fc053eb..e7543a5 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -70,7 +70,6 @@ type RelayConnection struct { eventsToSendMutex sync.Mutex newEventsCh chan domain.Event rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager - cancelRun context.CancelFunc cancelBackPressure context.CancelFunc } @@ -91,7 +90,6 @@ func NewRelayConnection( eventsToSend: make(map[domain.EventId]*eventToSend), newEventsCh: make(chan domain.Event), rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager, - cancelRun: nil, cancelBackPressure: nil, } } @@ -311,8 +309,6 @@ func (r *RelayConnection) triggerSubscriptionUpdate() { func (r *RelayConnection) run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) - backPressureCtx, cancelFromBackPressure := context.WithCancel(ctx) - r.cancelRun = cancelFromBackPressure defer cancel() defer r.setState(RelayConnectionStateDisconnected) @@ -359,17 +355,9 @@ func (r *RelayConnection) run(ctx context.Context) error { return NewReadMessageError(err) } - select { - case <-backPressureCtx.Done(): - // The backpressure handling code is to avoid overwhelming the queue - // that writes to relay.nos.social so we skip that signal for it or - // the queue will never shrink - if r.Address().HostWithoutPort() == "relay.nos.social" { - continue - } - - return BackPressureError - default: + if r.state == RelayConnectionStateBackPressured { + // Load shedding under backpressure + continue } if err := r.handleMessage(messageBytes); err != nil { diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index 13dea88..84423ad 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -76,9 +76,8 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re func (d *RelayConnections) NotifyBackPressure() { for _, connection := range d.connections { - if connection.cancelRun != nil && connection.Address().HostWithoutPort() != "relay.nos.social" { - connection.cancelRun() - connection.cancelRun = nil + if connection.Address().HostWithoutPort() != "relay.nos.social" { + connection.setState(RelayConnectionStateBackPressured) } } } From 1d8df0046a6b18a502e8c4ffc7432a3737e3e012 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Mon, 20 May 2024 13:47:01 -0300 Subject: [PATCH 2/2] Remove unnecessary continue --- service/domain/relays/relay_connection.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index e7543a5..e0a1623 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -366,7 +366,6 @@ func (r *RelayConnection) run(ctx context.Context) error { WithError(err). WithField("message", string(messageBytes)). Message("error handling an incoming message") - continue } } }