From dfb22d9126c4e1c2001b89b689df1f86724e46fd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 13 Feb 2019 09:47:49 -0800 Subject: [PATCH] Smarter kickFlusher on subs Signed-off-by: Derek Collison --- nats.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nats.go b/nats.go index 41272488f..06bb0730c 100644 --- a/nats.go +++ b/nats.go @@ -1969,7 +1969,6 @@ func (nc *Conn) readLoop() { nc.processOpErr(err) break } - if err := nc.parse(b[:n]); err != nil { nc.processOpErr(err) break @@ -2775,7 +2774,6 @@ func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Sub nc.mu.Lock() // ok here, but defer is generally expensive defer nc.mu.Unlock() - defer nc.kickFlusher() // Check for some error conditions. if nc.isClosed() { @@ -2812,10 +2810,15 @@ func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Sub nc.subsMu.Unlock() // We will send these for all subs when we reconnect - // so that we can suppress here. + // so that we can suppress here if reconnecting. if !nc.isReconnecting() { fmt.Fprintf(nc.bw, subProto, subj, queue, sub.sid) + // Kick flusher if needed. + if len(nc.fch) == 0 { + nc.kickFlusher() + } } + return sub, nil } @@ -3363,7 +3366,6 @@ func (nc *Conn) resendSubscriptions() { if s.delivered < s.max { adjustedMax = s.max - s.delivered } - // adjustedMax could be 0 here if the number of delivered msgs // reached the max, if so unsubscribe. if adjustedMax == 0 {