From 5d718e92eabd45ff9eed3bd1ca3d437abe8ee14f Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 7 Jul 2021 12:30:13 +0300 Subject: [PATCH] Add exvar to expose debug variables --- gor.go | 1 + tcp/tcp_message.go | 13 +++++++++++++ tcp/tcp_packet.go | 39 ++++++++++++++++++++++++++++----------- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/gor.go b/gor.go index c40d6248..d7134e11 100644 --- a/gor.go +++ b/gor.go @@ -3,6 +3,7 @@ package main import ( + _ "expvar" "flag" "log" "net/http" diff --git a/tcp/tcp_message.go b/tcp/tcp_message.go index de7a18ca..70d6e716 100644 --- a/tcp/tcp_message.go +++ b/tcp/tcp_message.go @@ -43,11 +43,13 @@ func NewBufferPool(max int, ttl int) *bufPool { select { case pool.buffers <- c: default: + stats.Add("active_buffer_count", -1) c.b = nil c.gc = true released++ } } else { + stats.Add("active_buffer_count", -1) // Else GC c.b = nil c.gc = true @@ -58,6 +60,9 @@ func NewBufferPool(max int, ttl int) *bufPool { } } + bufPoolCount.Set(int64(len(pool.buffers))) + releasedCount.Set(int64(released)) + time.Sleep(1000 * time.Millisecond) } }() @@ -71,6 +76,9 @@ func (p *bufPool) Get() *buf { select { case c = <-p.buffers: default: + stats.Add("total_alloc_buffer_count", 1) + stats.Add("active_buffer_count", 1) + c = new(buf) c.b = make([]byte, 1024) c.created = now @@ -91,6 +99,7 @@ func (p *bufPool) Put(c *buf) { select { case p.buffers <- c: default: + stats.Add("active_buffers", -1) c.gc = true c.b = nil // if pool overloaded, let it go @@ -392,6 +401,7 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool { trunc := m.Length + len(pckt.Payload) - int(parser.maxSize) if trunc > 0 { m.Truncated = true + stats.Add("message_timeout_count", 1) pckt.Payload = pckt.Payload[:int(parser.maxSize)-m.Length] } if !m.add(pckt) { @@ -423,6 +433,8 @@ func (parser *MessageParser) Messages() chan *Message { } func (parser *MessageParser) Emit(m *Message) { + stats.Add("message_count", 1) + delete(parser.m, m.packets[0].MessageID()) parser.messages <- m @@ -440,6 +452,7 @@ func (parser *MessageParser) timer(now time.Time) { for _, m := range parser.m { if now.Sub(m.End) > parser.messageExpire { m.TimedOut = true + stats.Add("message_timeout_count", 1) failMsg++ if parser.End == nil || parser.allowIncompete { parser.Emit(m) diff --git a/tcp/tcp_packet.go b/tcp/tcp_packet.go index d6c3538f..4a203314 100644 --- a/tcp/tcp_packet.go +++ b/tcp/tcp_packet.go @@ -2,6 +2,7 @@ package tcp import ( "encoding/binary" + "expvar" "fmt" "net" _ "runtime" @@ -32,7 +33,19 @@ func copySlice(to []byte, from ...[]byte) ([]byte, int) { var now time.Time +var stats *expvar.Map +var bufPoolCount *expvar.Int +var releasedCount *expvar.Int + func init() { + bufPoolCount = new(expvar.Int) + releasedCount = new(expvar.Int) + + stats = expvar.NewMap("tcp") + stats.Init() + stats.Set("buffer_pool_count", bufPoolCount) + stats.Set("buffer_released", releasedCount) + go func() { for { // Accurate enough @@ -59,35 +72,37 @@ func NewPacketPool(max int, ttl int) *pktPool { // Ensure that memory released over time go func() { - var released int // GC + var released int for { for i := 0; i < 500; i++ { select { case c := <-pool.packets: // GC If buffer is too big and lived for too long - if len(c.buf) < 16384 && now.Sub(c.created) < time.Duration(ttl)*time.Second { + if len(c.buf) < 8192 || now.Sub(c.created) < time.Duration(ttl)*time.Second { select { case pool.packets <- c: + // Jump to next item in for loop + continue default: - c.buf = nil - c.gc = true - released++ } - } else { - // Else GC - c.buf = nil - c.gc = true - released++ } + + released++ + + // Else GC + c.buf = nil + c.gc = true + + stats.Add("active_packet_count", -1) default: break } } if released > 500 { - debug.FreeOSMemory() released = 0 + debug.FreeOSMemory() } time.Sleep(1000 * time.Millisecond) @@ -103,6 +118,7 @@ func (p *pktPool) Get() *Packet { select { case c = <-p.packets: default: + stats.Add("active_packet_count", 1) c = new(Packet) c.created = now @@ -122,6 +138,7 @@ func (p *pktPool) Put(c *Packet) { select { case p.packets <- c: default: + stats.Add("active_packet_count", -1) c.gc = true c.buf = nil // if pool overloaded, let it go