Skip to content

Commit

Permalink
Refactor Conn completely, remove discard list, improve tests
Browse files Browse the repository at this point in the history
The discard list which was used within Conn has been removed completely,
in favor of simply discarding a single message immediately after a clean
context timeout. This involved rewriting essentially the entire reader.

The tests for Conn have been much improved as well, they are now much
more rigorous.
  • Loading branch information
Brian Picciano committed Dec 13, 2022
1 parent 6b55a59 commit 84dff11
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 160 deletions.
201 changes: 101 additions & 100 deletions conn.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package radix

import (
"container/list"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -119,122 +118,123 @@ func (c *conn) Close() error {
return c.proc.PrefixedClose(c.conn.Close, nil)
}

func (c *conn) reader(ctx context.Context) {
func isRespErr(err error) bool {
return errors.As(err, &resp3.SimpleError{}) || errors.As(err, &resp3.BlobError{})
}

discardList := list.New()
func (c *conn) doDiscard(unmarshalInto interface{}) error {

doneCh := ctx.Done()
for {
select {
case <-doneCh:
return
case mu := <-c.rCh:
c.readSeqL.Lock()

c.readSeqL.Lock()
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
c.readSeqL.Unlock()
return fmt.Errorf("unsetting read deadline: %w", err)
}

var skip bool
if err := mu.ctx.Err(); err != nil {
mu.errCh <- fmt.Errorf("checking context before read: %w", err)
skip = true
} else if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
mu.errCh <- fmt.Errorf("unsetting read deadline: %w", err)
skip = true
}
// setting currReadSeq lets us discard the next message without any
// possibility of being interrupted by an EncodeDecode routine.
c.currReadSeq = 0

if !skip {
c.currReadSeq = mu.readSeq
}
c.readSeqL.Unlock()

c.readSeqL.Unlock()
err := resp3.Unmarshal(c.br, unmarshalInto, c.rOpts)

if skip {
if mu.marshal != nil {
discardList.PushBack(mu)
}
continue
}
if isRespErr(err) {
err = nil
}

var err error
c.conn.resetBytesRead()
buffered := c.br.Buffered()

// Discard messages queued up on the wire which aren't for this
// EncodeDecode call. Only discard messages if the EncodeDecode
// gives a value to marshal, as that indicates there's a specific
// response message in the queue which corresponds to it. If no
// value to marshal is given then the EncodeDecode will read in
// whatever the next message in the queue is.
if mu.marshal != nil {
for {
el := discardList.Front()
if el == nil {
break
}

discardMU := el.Value.(connMarshalerUnmarshaler)

if err = resp3.Unmarshal(c.br, discardMU.unmarshalInto, c.rOpts); err != nil {
// Ignore RESP errors.
if !errors.As(err, &resp3.SimpleError{}) && !errors.As(err, &resp3.BlobError{}) {
break
}

err = nil
}

discardList.Remove(el)
}
}
return err
}

// if discarding didn't fail then read the actual desired response.
if err == nil {
if err = resp3.Unmarshal(c.br, mu.unmarshalInto, c.rOpts); err == nil && discardList.Len() > 0 {
discardList.Remove(discardList.Front())
}
}
func (c *conn) prepareForRead(mu connMarshalerUnmarshaler) error {

// simplify things for the caller by translating network timeouts
// into the context error, since that's actually what happened.
var canceled bool
if canceled = errors.Is(err, os.ErrDeadlineExceeded); canceled {
if err = mu.ctx.Err(); err == nil {
// there might be some crazy edge case where this can
// happen, I'm not sure... go contexts are not pleasant to
// work with.
err = context.Canceled
}
}
c.readSeqL.Lock()
defer c.readSeqL.Unlock()

if err != nil {
err = fmt.Errorf("unmarshaling message off Conn: %w", err)
}
if err := mu.ctx.Err(); err != nil {
return resp.ErrConnUsable{
Err: fmt.Errorf("checking context before read: %w", err),
}
}

if canceled {
if buffered == c.br.Buffered() && c.conn.totalBytesRead == 0 {
err = resp.ErrConnUsable{Err: err}
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
return fmt.Errorf("unsetting read deadline: %w", err)
}

if mu.marshal != nil {
discardList.PushBack(mu)
}
c.currReadSeq = mu.readSeq
return nil
}

} else {
// if Unmarshal returned a context error but data was also
// read it means that a message was only partially read off
// the wire. The Conn is unusable at this point, close it
// and bail.
go c.Close()
mu.errCh <- fmt.Errorf("after partial read off Conn: %w", err)
return
}
func (c *conn) doRead(mu connMarshalerUnmarshaler) error {

if err := c.prepareForRead(mu); err != nil {
return err
}

} else if err != nil && !errors.As(err, new(resp.ErrConnUsable)) {
c.conn.resetBytesRead()
buffered := c.br.Buffered()

err := resp3.Unmarshal(c.br, mu.unmarshalInto, c.rOpts)

// simplify things for the caller by translating network timeouts
// into the context error, since that's actually what happened.
var canceled bool
if canceled = errors.Is(err, os.ErrDeadlineExceeded); canceled {

err = mu.ctx.Err()

// there might be some crazy edge case where this can happen, I'm not
// sure... go contexts are not pleasant to work with.
if err == nil {
err = context.Canceled
}

// if Unmarshal returned a context error but data was also read it means
// that a message was only partially read off the wire. The Conn is
// unusable at this point, close it and bail.
if buffered != c.br.Buffered() || c.conn.totalBytesRead > 0 {
return fmt.Errorf("after partial read off Conn: %w", err)
}

return resp.ErrConnUsable{Err: err}
}

return err
}

func (c *conn) reader(ctx context.Context) {

doneCh := ctx.Done()
for {
select {

case <-doneCh:
return

case mu := <-c.rCh:

err := c.doRead(mu)
mu.errCh <- err

if err == nil || isRespErr(err) {
continue

} else if !errors.As(err, new(resp.ErrConnUsable)) {
go c.Close()
mu.errCh <- fmt.Errorf("unexpected error on Conn: %w", err)
return

}
// if the EncodeDecode did not involve a value being marshaled
// onto the wire, then we assume that we are not in a
// command/response mode, but rather are waiting for
// asynchronous writes (ie pubsub mode). In this case we don't
// discard the next command coming down the wire.
} else if mu.marshal != nil {

mu.errCh <- err
if err := c.doDiscard(mu.unmarshalInto); err != nil {
go c.Close()
return
}
}
}
}
}
Expand All @@ -255,7 +255,8 @@ func (c *conn) putErrCh(errCh chan error) {
}
}

func (c *conn) EncodeDecode(ctx context.Context, m, u interface{}) error {
func (c *conn) EncodeDecode(ctx context.Context, m, u interface{}) (err error) {

mu := connMarshalerUnmarshaler{
ctx: ctx,
marshal: m,
Expand Down Expand Up @@ -296,7 +297,7 @@ func (c *conn) EncodeDecode(ctx context.Context, m, u interface{}) error {
case err := <-mu.errCh:
c.putErrCh(mu.errCh)
if err != nil {
err = fmt.Errorf("waiting for response from Conn: %w", ctx.Err())
err = fmt.Errorf("waiting for response from Conn: %w", err)
}
return err
case <-closedCh:
Expand Down
Loading

0 comments on commit 84dff11

Please sign in to comment.