Skip to content

Commit

Permalink
Added timeout to reading receipt
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Ridgway committed Nov 29, 2020
1 parent d1f46d5 commit 72d46eb
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 6 deletions.
41 changes: 35 additions & 6 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const DefaultHeartBeatError = 5 * time.Second
// Default timeout of calling Conn.Send function
const DefaultMsgSendTimeout = 10 * time.Second

// Default timeout of calling Conn.Send function
const DefaultRcvReceiptTimeout = 10 * time.Second

// A Conn is a connection to a STOMP server. Create a Conn using either
// the Dial or Connect function.
type Conn struct {
Expand All @@ -31,6 +34,7 @@ type Conn struct {
readTimeout time.Duration
writeTimeout time.Duration
msgSendTimeout time.Duration
rcvReceiptTimeout time.Duration
hbGracePeriodMultiplier float64
closed bool
closeMutex *sync.Mutex
Expand Down Expand Up @@ -185,6 +189,7 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error)
}

c.msgSendTimeout = options.MsgSendTimeout
c.rcvReceiptTimeout = options.RcvReceiptTimeout

if options.ResponseHeadersCallback != nil {
options.ResponseHeadersCallback(response.Header)
Expand Down Expand Up @@ -450,13 +455,14 @@ func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*
C: make(chan *frame.Frame),
}

err := sendDataToWriteChWithTimeout(c.writeCh, request, c.msgSendTimeout)
if err != nil {
return err
sendErr := sendDataToWriteChWithTimeout(c.writeCh, request, c.msgSendTimeout)
if sendErr != nil {
return sendErr
}
response := <-request.C
if response.Command != frame.RECEIPT {
return newError(response)

receiptErr := readReceiptWithTimeout(request, c.rcvReceiptTimeout)
if receiptErr != nil {
return receiptErr
}
} else {
// no receipt required
Expand All @@ -471,6 +477,29 @@ func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*
return nil
}

func readReceiptWithTimeout(request writeRequest, timeout time.Duration) error {
handle := func(response *frame.Frame) error {
if response.Command != frame.RECEIPT {
return newError(response)
}
return nil
}

if timeout <= 0 {
response := <-request.C
return handle(response)
}

timer := time.NewTimer(timeout)
select {
case <-timer.C:
return ErrMsgReceiptTimeout
case response := <-request.C:
timer.Stop()
return handle(response)
}
}

func sendDataToWriteChWithTimeout(ch chan writeRequest, request writeRequest, timeout time.Duration) error {
if timeout <= 0 {
ch <- request
Expand Down
14 changes: 14 additions & 0 deletions conn_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type connOptions struct {
WriteTimeout time.Duration
HeartBeatError time.Duration
MsgSendTimeout time.Duration
RcvReceiptTimeout time.Duration
HeartBeatGracePeriodMultiplier float64
Login, Passcode string
AcceptVersions []string
Expand All @@ -34,6 +35,7 @@ func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error)
HeartBeatGracePeriodMultiplier: 1.0,
HeartBeatError: DefaultHeartBeatError,
MsgSendTimeout: DefaultMsgSendTimeout,
RcvReceiptTimeout: DefaultRcvReceiptTimeout,
}

// This is a slight of hand, attach the options to the Conn long
Expand Down Expand Up @@ -139,6 +141,11 @@ var ConnOpt struct {
// Less than or equal to zero means infinite
MsgSendTimeout func(msgSendTimeout time.Duration) func(*Conn) error

// RcvReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Send function. This helps
// avoid deadlocks. If this is not specified, the default is 10 seconds.
RcvReceiptTimeout func(rcvReceiptTimeout time.Duration) func(*Conn) error

// HeartBeatGracePeriodMultiplier is used to calculate the effective read heart-beat timeout
// the broker will enforce for each client’s connection. The multiplier is applied to
// the read-timeout interval the client specifies in its CONNECT frame
Expand Down Expand Up @@ -228,6 +235,13 @@ func init() {
}
}

ConnOpt.RcvReceiptTimeout = func(rcvReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.RcvReceiptTimeout = rcvReceiptTimeout
return nil
}
}

ConnOpt.HeartBeatGracePeriodMultiplier = func(multiplier float64) func(*Conn) error {
return func(c *Conn) error {
c.options.HeartBeatGracePeriodMultiplier = multiplier
Expand Down

0 comments on commit 72d46eb

Please sign in to comment.