Skip to content

Commit

Permalink
Merge pull request #300 from effodio/cleaner-version
Browse files Browse the repository at this point in the history
Handle peer disconnect in blocking calls
  • Loading branch information
alicebob authored Nov 6, 2022
2 parents f3ff91a + faa72bb commit e5e64fb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 12 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGn
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
9 changes: 7 additions & 2 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,19 @@ func blocking(
m.Lock()
defer m.Unlock()
for {
done := cb(c, ctx)
if done {
if c.Closed() {
return
}

if m.Ctx.Err() != nil {
return
}

done := cb(c, ctx)
if done {
return
}

if timedOut {
onTimeout(c)
return
Expand Down
38 changes: 38 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package miniredis

import (
"sync"
"testing"
"time"

"github.com/alicebob/miniredis/v2/server"
)

func TestRedis(t *testing.T) {
s, err := Run()
ok(t, err)
defer s.Close()

peer := &server.Peer{}
var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
blocking(s, peer, time.Second, func(p *server.Peer, cc *connCtx) bool {
err := s.Ctx.Err()
if err != nil {
t.Error("blocking call should not retry command when context has error")
return true
}
return false
}, func(p *server.Peer) {
// expect to time out
})
}()

time.Sleep(time.Millisecond * 250)

s.Close()
wg.Wait()
}
33 changes: 25 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,34 @@ func (s *Server) servePeer(c net.Conn) {
peer := &Peer{
w: bufio.NewWriter(c),
}

defer func() {
for _, f := range peer.onDisconnect {
f()
}
}()

for {
args, err := readArray(r)
if err != nil {
return
readCh := make(chan []string)

go func() {
defer close(readCh)

for {
args, err := readArray(r)
if err != nil {
peer.Close()
return
}

readCh <- args
}
}()

for args := range readCh {
s.Dispatch(peer, args)
peer.Flush()

s.mu.Lock()
closed := peer.closed
s.mu.Unlock()
if closed {
if peer.Closed() {
c.Close()
}
}
Expand Down Expand Up @@ -259,6 +269,13 @@ func (c *Peer) Close() {
c.closed = true
}

// Return true if the peer connection closed.
func (c *Peer) Closed() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.closed
}

// Register a function to execute on disconnect. There can be multiple
// functions registered.
func (c *Peer) OnDisconnect(f func()) {
Expand Down

0 comments on commit e5e64fb

Please sign in to comment.