From 94cd772c0f35607ab7e5a7f9945c6be7434a8208 Mon Sep 17 00:00:00 2001 From: imxyb Date: Wed, 28 Oct 2020 22:46:59 +0800 Subject: [PATCH] nsqd: use buffer pool for serializing message for client --- nsqd/buffer_pool.go | 1 + nsqd/channel.go | 14 +++++--------- nsqd/message.go | 6 +++--- nsqd/protocol_v2.go | 4 +++- nsqd/topic.go | 9 ++------- 5 files changed, 14 insertions(+), 20 deletions(-) diff --git a/nsqd/buffer_pool.go b/nsqd/buffer_pool.go index d47d06bf0..df1a04c2b 100644 --- a/nsqd/buffer_pool.go +++ b/nsqd/buffer_pool.go @@ -18,5 +18,6 @@ func bufferPoolGet() *bytes.Buffer { } func bufferPoolPut(b *bytes.Buffer) { + b.Reset() bp.Put(b) } diff --git a/nsqd/channel.go b/nsqd/channel.go index 606387267..23bf73ff4 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -1,7 +1,6 @@ package nsqd import ( - "bytes" "container/heap" "errors" "math" @@ -11,6 +10,7 @@ import ( "time" "github.com/nsqio/go-diskqueue" + "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/pqueue" "github.com/nsqio/nsq/internal/quantile" @@ -211,8 +211,6 @@ finish: // flush persists all the messages in internal memory buffers to the backend // it does not drain inflight/deferred because it is only called in Close() func (c *Channel) flush() error { - var msgBuf bytes.Buffer - if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { c.ctx.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend", c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages)) @@ -221,7 +219,7 @@ func (c *Channel) flush() error { for { select { case msg := <-c.memoryMsgChan: - err := writeMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(msg, c.backend) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) } @@ -233,7 +231,7 @@ func (c *Channel) flush() error { finish: c.inFlightMutex.Lock() for _, msg := range c.inFlightMessages { - err := writeMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(msg, c.backend) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) } @@ -243,7 +241,7 @@ finish: c.deferredMutex.Lock() for _, item := range c.deferredMessages { msg := item.Value.(*Message) - err := writeMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(msg, c.backend) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err) } @@ -307,9 +305,7 @@ func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: default: - b := bufferPoolGet() - err := writeMessageToBackend(b, m, c.backend) - bufferPoolPut(b) + err := writeMessageToBackend(m, c.backend) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", diff --git a/nsqd/message.go b/nsqd/message.go index 77ee4c79d..707725bc9 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -1,7 +1,6 @@ package nsqd import ( - "bytes" "encoding/binary" "fmt" "io" @@ -90,8 +89,9 @@ func decodeMessage(b []byte) (*Message, error) { return &msg, nil } -func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error { - buf.Reset() +func writeMessageToBackend(msg *Message, bq BackendQueue) error { + buf := bufferPoolGet() + defer bufferPoolPut(buf) _, err := msg.WriteTo(buf) if err != nil { return err diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 54cd526a9..0c0593ce7 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -124,7 +124,9 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error { p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body) - var buf = &bytes.Buffer{} + + buf := bufferPoolGet() + defer bufferPoolPut(buf) _, err := msg.WriteTo(buf) if err != nil { diff --git a/nsqd/topic.go b/nsqd/topic.go index 10fd11119..3ad5d6850 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -1,7 +1,6 @@ package nsqd import ( - "bytes" "errors" "strings" "sync" @@ -221,9 +220,7 @@ func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: - b := bufferPoolGet() - err := writeMessageToBackend(b, m, t.backend) - bufferPoolPut(b) + err := writeMessageToBackend(m, t.backend) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, @@ -409,8 +406,6 @@ finish: } func (t *Topic) flush() error { - var msgBuf bytes.Buffer - if len(t.memoryMsgChan) > 0 { t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): flushing %d memory messages to backend", @@ -420,7 +415,7 @@ func (t *Topic) flush() error { for { select { case msg := <-t.memoryMsgChan: - err := writeMessageToBackend(&msgBuf, msg, t.backend) + err := writeMessageToBackend(msg, t.backend) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "ERROR: failed to write message to backend - %s", err)