Skip to content

Commit

Permalink
Merge pull request #9 from cilium/shared-reader-error-fix
Browse files Browse the repository at this point in the history
shared client: Do not busy loop on errors
  • Loading branch information
jrajahalme authored Nov 20, 2023
2 parents eaf71f6 + 496cd01 commit 7293451
Showing 1 changed file with 62 additions and 11 deletions.
73 changes: 62 additions & 11 deletions shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
Expand Down Expand Up @@ -143,21 +144,55 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque

responses := make(chan sharedClientResponse)

// receiverTrigger is used to wake up the receive loop after request(s) have been sent. It
// must be buffered to be able to send a trigger while the receive loop is not yet ready to
// receive the trigger, as we do not want to stall the sender when the receiver is blocking
// on the read operation.
receiverTrigger := make(chan struct{}, 1)
triggerReceiver := func() {
select {
case receiverTrigger <- struct{}{}:
default:
}
}

// Receive loop
wg.Add(1)
go func() {
defer wg.Done()
defer close(responses)

// No point trying to receive until the first request has been successfully sent, so
// wait for a trigger first. receiverTrigger is buffered, so this is safe
// to do, even if the sender sends the trigger before we are ready to receive here.
<-receiverTrigger

for {
// This will block but eventually return an i/o timeout, as we always set
// the timeouts before sending anything
r, err := conn.ReadMsg()
if err != nil {
// handler is not reading on the channel after closing
if errors.Is(err, net.ErrClosed) {
return
}
responses <- sharedClientResponse{nil, 0, err}
} else {
if err == nil {
responses <- sharedClientResponse{r, 0, nil}
continue // receive immediately again
}

// handler is not reading on the channel after closing.
// UDP connections return net.ErrClosed, while TCP/TLS connections are read
// via the io package, which return io.EOF.
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return
}

// send error response to cancel all current requests.
responses <- sharedClientResponse{nil, 0, err}

// wait for a trigger from the handler after any errors. Re-reading in
// this condition could busy loop, e.g., if a read timeout occurred.
// receiverTrigger is buffered so that we catch the trigger that may
// have been sent while we sent the error response above.
_, ok := <-receiverTrigger
if !ok {
return // exit immediately when the trigger channel is closed
}
}
}()
Expand All @@ -169,6 +204,7 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
waitingResponses := make(map[uint16]waiter)
defer func() {
conn.Close()
close(receiverTrigger)

// Drain responses send by receive loop to allow it to exit.
// It may be repeatedly reading after an i/o timeout, for example.
Expand All @@ -185,6 +221,9 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
select {
case req, ok := <-requests:
if !ok {
// 'requests' is closed when SharedClient is recycled, which happens
// responeses (or errors) have been received and there are no more
// requests to be sent.
return
}
start := time.Now()
Expand All @@ -194,10 +233,15 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
close(req.ch)
} else {
waitingResponses[req.msg.Id] = waiter{req.ch, start}

// Wake up the receiver that may be waiting to receive again
triggerReceiver()
}

case resp, ok := <-responses:
if !ok {
// 'responses' is closed when the receive loop exits, so we quit as
// nothing can be received any more
return
}
if resp.err != nil {
Expand Down Expand Up @@ -234,12 +278,19 @@ func (c *SharedClient) ExchangeSharedContext(ctx context.Context, m *Msg) (r *Ms
}
c.Unlock()

// This request keeps 'c.requests' open; sending a request may hang indefinitely if
// the handler happens to quit at the same time. Use ctx.Done to avoid this.
timeout := c.Client.writeTimeout()
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
respCh := make(chan sharedClientResponse)
c.requests <- request{
ctx: ctx,
msg: m,
ch: respCh,
select {
case c.requests <- request{ctx: ctx, msg: m, ch: respCh}:
case <-ctx.Done():
return nil, 0, ctx.Err()
}

// Since c.requests is unbuffered, the handler is guaranteed to eventually close 'respCh'
resp := <-respCh
return resp.msg, resp.rtt, resp.err
}
Expand Down

0 comments on commit 7293451

Please sign in to comment.