From 1d7faae7539c81dbe73d7220293defaf799ac4d7 Mon Sep 17 00:00:00 2001 From: Colin Zuo Date: Fri, 3 Nov 2023 14:53:33 +0800 Subject: [PATCH 1/2] fix subscription channel set twice for replyTo case --- conn.go | 33 ++++++++++++++++++++++++--------- subscription.go | 5 +++++ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/conn.go b/conn.go index a1b4bca..9c424ee 100644 --- a/conn.go +++ b/conn.go @@ -367,24 +367,32 @@ func processLoop(c *Conn, writer *frame.Writer) { switch req.Frame.Command { case frame.SUBSCRIBE: - id, _ := req.Frame.Header.Contains(frame.Id) - channels[id] = req.C - // if using a temp queue, map that destination as a known channel // however, don't send the frame, it's most likely an invalid destination // on the broker. if replyTo, ok := req.Frame.Header.Contains(ReplyToHeader); ok { channels[replyTo] = req.C sendFrame = false + } else { + id, _ := req.Frame.Header.Contains(frame.Id) + channels[id] = req.C } case frame.UNSUBSCRIBE: - id, _ := req.Frame.Header.Contains(frame.Id) - // is this trying to be too clever -- add a receipt - // header so that when the server responds with a - // RECEIPT frame, the corresponding channel will be closed - req.Frame.Header.Set(frame.Receipt, id) - + if replyTo, ok := req.Frame.Header.Contains(ReplyToHeader); ok { + ch, ok := channels[replyTo] + if ok { + delete(channels, replyTo) + close(ch) + } + sendFrame = false + } else { + id, _ := req.Frame.Header.Contains(frame.Id) + // is this trying to be too clever -- add a receipt + // header so that when the server responds with a + // RECEIPT frame, the corresponding channel will be closed + req.Frame.Header.Set(frame.Receipt, id) + } } // frame to send, if enabled @@ -645,6 +653,12 @@ func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Fr } } + replyTo, replyToSet := subscribeFrame.Header.Contains(ReplyToHeader) + + if replyToSet { + subscribeFrame.Header.Set(frame.Id, replyTo) + } + // If the option functions have not specified the "id" header entry, // create one. id, ok := subscribeFrame.Header.Contains(frame.Id) @@ -661,6 +675,7 @@ func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Fr closeMutex := &sync.Mutex{} sub := &Subscription{ id: id, + replyToSet: replyToSet, destination: destination, conn: c, ackMode: ack, diff --git a/subscription.go b/subscription.go index 6aeaaf7..53e5667 100644 --- a/subscription.go +++ b/subscription.go @@ -21,6 +21,7 @@ const ( type Subscription struct { C chan *Message id string + replyToSet bool destination string conn *Conn ackMode AckMode @@ -75,6 +76,10 @@ func (s *Subscription) Unsubscribe(opts ...func(*frame.Frame) error) error { } } + if s.replyToSet { + f.Header.Set(ReplyToHeader, s.id) + } + s.conn.sendFrame(f) // UNSUBSCRIBE is a bit weird in that it is tagged with a "receipt" header From 319b37b1fb0b346ad2b7225b48e9f013d78c183f Mon Sep 17 00:00:00 2001 From: Colin Zuo Date: Wed, 8 Nov 2023 17:38:05 +0800 Subject: [PATCH 2/2] for client readTimeout use MAX(, ) --- conn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/conn.go b/conn.go index 9c424ee..6efd40d 100644 --- a/conn.go +++ b/conn.go @@ -182,6 +182,10 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error) } } + if readTimeout < options.ReadTimeout { + readTimeout = options.ReadTimeout + } + c.readTimeout = readTimeout c.writeTimeout = writeTimeout