Skip to content

Commit

Permalink
Add exvar to expose debug variables
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Jul 7, 2021
1 parent 148846b commit 5d718e9
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
1 change: 1 addition & 0 deletions gor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
_ "expvar"
"flag"
"log"
"net/http"
Expand Down
13 changes: 13 additions & 0 deletions tcp/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func NewBufferPool(max int, ttl int) *bufPool {
select {
case pool.buffers <- c:
default:
stats.Add("active_buffer_count", -1)
c.b = nil
c.gc = true
released++
}
} else {
stats.Add("active_buffer_count", -1)
// Else GC
c.b = nil
c.gc = true
Expand All @@ -58,6 +60,9 @@ func NewBufferPool(max int, ttl int) *bufPool {
}
}

bufPoolCount.Set(int64(len(pool.buffers)))
releasedCount.Set(int64(released))

time.Sleep(1000 * time.Millisecond)
}
}()
Expand All @@ -71,6 +76,9 @@ func (p *bufPool) Get() *buf {
select {
case c = <-p.buffers:
default:
stats.Add("total_alloc_buffer_count", 1)
stats.Add("active_buffer_count", 1)

c = new(buf)
c.b = make([]byte, 1024)
c.created = now
Expand All @@ -91,6 +99,7 @@ func (p *bufPool) Put(c *buf) {
select {
case p.buffers <- c:
default:
stats.Add("active_buffers", -1)
c.gc = true
c.b = nil
// if pool overloaded, let it go
Expand Down Expand Up @@ -392,6 +401,7 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool {
trunc := m.Length + len(pckt.Payload) - int(parser.maxSize)
if trunc > 0 {
m.Truncated = true
stats.Add("message_timeout_count", 1)
pckt.Payload = pckt.Payload[:int(parser.maxSize)-m.Length]
}
if !m.add(pckt) {
Expand Down Expand Up @@ -423,6 +433,8 @@ func (parser *MessageParser) Messages() chan *Message {
}

func (parser *MessageParser) Emit(m *Message) {
stats.Add("message_count", 1)

delete(parser.m, m.packets[0].MessageID())

parser.messages <- m
Expand All @@ -440,6 +452,7 @@ func (parser *MessageParser) timer(now time.Time) {
for _, m := range parser.m {
if now.Sub(m.End) > parser.messageExpire {
m.TimedOut = true
stats.Add("message_timeout_count", 1)
failMsg++
if parser.End == nil || parser.allowIncompete {
parser.Emit(m)
Expand Down
39 changes: 28 additions & 11 deletions tcp/tcp_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tcp

import (
"encoding/binary"
"expvar"
"fmt"
"net"
_ "runtime"
Expand Down Expand Up @@ -32,7 +33,19 @@ func copySlice(to []byte, from ...[]byte) ([]byte, int) {

var now time.Time

var stats *expvar.Map
var bufPoolCount *expvar.Int
var releasedCount *expvar.Int

func init() {
bufPoolCount = new(expvar.Int)
releasedCount = new(expvar.Int)

stats = expvar.NewMap("tcp")
stats.Init()
stats.Set("buffer_pool_count", bufPoolCount)
stats.Set("buffer_released", releasedCount)

go func() {
for {
// Accurate enough
Expand All @@ -59,35 +72,37 @@ func NewPacketPool(max int, ttl int) *pktPool {

// Ensure that memory released over time
go func() {
var released int
// GC
var released int
for {
for i := 0; i < 500; i++ {
select {
case c := <-pool.packets:
// GC If buffer is too big and lived for too long
if len(c.buf) < 16384 && now.Sub(c.created) < time.Duration(ttl)*time.Second {
if len(c.buf) < 8192 || now.Sub(c.created) < time.Duration(ttl)*time.Second {
select {
case pool.packets <- c:
// Jump to next item in for loop
continue
default:
c.buf = nil
c.gc = true
released++
}
} else {
// Else GC
c.buf = nil
c.gc = true
released++
}

released++

// Else GC
c.buf = nil
c.gc = true

stats.Add("active_packet_count", -1)
default:
break
}
}

if released > 500 {
debug.FreeOSMemory()
released = 0
debug.FreeOSMemory()
}

time.Sleep(1000 * time.Millisecond)
Expand All @@ -103,6 +118,7 @@ func (p *pktPool) Get() *Packet {
select {
case c = <-p.packets:
default:
stats.Add("active_packet_count", 1)
c = new(Packet)
c.created = now

Expand All @@ -122,6 +138,7 @@ func (p *pktPool) Put(c *Packet) {
select {
case p.packets <- c:
default:
stats.Add("active_packet_count", -1)
c.gc = true
c.buf = nil
// if pool overloaded, let it go
Expand Down

0 comments on commit 5d718e9

Please sign in to comment.