diff --git a/nats.go b/nats.go index 41272488f..06bb0730c 100644 --- a/nats.go +++ b/nats.go @@ -1969,7 +1969,6 @@ func (nc *Conn) readLoop() { nc.processOpErr(err) break } - if err := nc.parse(b[:n]); err != nil { nc.processOpErr(err) break @@ -2775,7 +2774,6 @@ func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Sub nc.mu.Lock() // ok here, but defer is generally expensive defer nc.mu.Unlock() - defer nc.kickFlusher() // Check for some error conditions. if nc.isClosed() { @@ -2812,10 +2810,15 @@ func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Sub nc.subsMu.Unlock() // We will send these for all subs when we reconnect - // so that we can suppress here. + // so that we can suppress here if reconnecting. if !nc.isReconnecting() { fmt.Fprintf(nc.bw, subProto, subj, queue, sub.sid) + // Kick flusher if needed. + if len(nc.fch) == 0 { + nc.kickFlusher() + } } + return sub, nil } @@ -3363,7 +3366,6 @@ func (nc *Conn) resendSubscriptions() { if s.delivered < s.max { adjustedMax = s.max - s.delivered } - // adjustedMax could be 0 here if the number of delivered msgs // reached the max, if so unsubscribe. if adjustedMax == 0 {