From c7a30c024996633279232fed8bc5e60e15c1d390 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 15 Nov 2024 15:05:34 -0800 Subject: [PATCH] Fixed deadlock when handling multiple stream entries errors --- server/leader_controller.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/server/leader_controller.go b/server/leader_controller.go index dab298d3..b5b9739c 100644 --- a/server/leader_controller.go +++ b/server/leader_controller.go @@ -886,10 +886,10 @@ func (lc *leaderController) handleWriteStream(stream proto.OxiaClient_WriteStrea req, err := stream.Recv() if err != nil { - closeCh <- err + sendNonBlocking(closeCh, err) return } else if req == nil { - closeCh <- errors.New("stream closed") + sendNonBlocking(closeCh, errors.New("stream closed")) return } @@ -908,7 +908,7 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS offset int64, timestamp uint64, err error, timer metrics.Timer) { if err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } @@ -917,13 +917,13 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS }, func(response *proto.WriteResponse, err error) { if err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } if err = stream.Send(response); err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } timer.Done() @@ -1136,3 +1136,10 @@ func checkStatusIsLeader(actual proto.ServingStatus) error { } return nil } + +func sendNonBlocking(ch chan error, err error) { + select { + case ch <- err: + default: + } +}