Skip to content

Commit

Permalink
Do not use buffer pool for messages
Browse files Browse the repository at this point in the history
This buffers are get re-used and data gets corrupted
  • Loading branch information
buger committed Jul 8, 2021
1 parent 625ed54 commit 61b377d
Showing 1 changed file with 3 additions and 119 deletions.
122 changes: 3 additions & 119 deletions tcp/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,108 +9,9 @@ import (
"time"
"unsafe"

"github.com/buger/goreplay/simpletime"
"github.com/buger/goreplay/size"
)

var bufferPool = NewBufferPool(1000, 1)

type buf struct {
b []byte
created time.Time
gc bool
}

type bufPool struct {
buffers chan *buf
ttl int
}

func NewBufferPool(max int, ttl int) *bufPool {
pool := &bufPool{
buffers: make(chan *buf, max),
ttl: ttl,
}

// Ensure that memory released over time
go func() {
var released int
// GC
for {
for i := 0; i < 100; i++ {
select {
case c := <-pool.buffers:
if simpletime.Now.Sub(c.created) < time.Duration(ttl)*time.Second {
select {
case pool.buffers <- c:
default:
stats.Add("active_buffer_count", -1)
c.b = nil
c.gc = true
released++
}
} else {
stats.Add("active_buffer_count", -1)
// Else GC
c.b = nil
c.gc = true
released++
}
default:
break
}
}

bufPoolCount.Set(int64(len(pool.buffers)))
releasedCount.Set(int64(released))

time.Sleep(1000 * time.Millisecond)
}
}()

return pool
}

// Borrow a Client from the pool.
func (p *bufPool) Get() *buf {
var c *buf
select {
case c = <-p.buffers:
default:
stats.Add("total_alloc_buffer_count", 1)
stats.Add("active_buffer_count", 1)

c = new(buf)
c.b = make([]byte, 1024)
c.created = simpletime.Now

// Use this technique to find if pool leaks, and objects get GCd
//
// runtime.SetFinalizer(c, func(p *buf) {
// if !p.gc {
// panic("Pool leak")
// }
// })
}
return c
}

// Return returns a Client to the pool.
func (p *bufPool) Put(c *buf) {
select {
case p.buffers <- c:
default:
stats.Add("active_buffers", -1)
c.gc = true
c.b = nil
// if pool overloaded, let it go
}
}

func (p *bufPool) Len() int {
return len(p.buffers)
}

// Stats every message carry its own stats object
type Stats struct {
LostData int
Expand All @@ -130,7 +31,6 @@ type Message struct {
packets []*Packet
parser *MessageParser
feedback interface{}
dataBuf *buf
Stats
}

Expand Down Expand Up @@ -164,8 +64,6 @@ func (m *Message) UUID() []byte {
}

func (m *Message) add(packet *Packet) bool {
// fmt.Println("SEQ:", packet.Seq, " - ", len(packet.Payload))

// Skip duplicates
for _, p := range m.packets {
if p.Seq == packet.Seq {
Expand Down Expand Up @@ -228,20 +126,10 @@ func (m *Message) PacketData() [][]byte {

// Data returns data in this message
func (m *Message) Data() []byte {
m.dataBuf = bufferPool.Get()

// var totalLen int
// for _, p := range m.packets {
// totalLen += len(p.Payload)
// }
// tmp := make([]byte, totalLen)
var n int
if m.dataBuf == nil {
panic("asdsd")
}
m.dataBuf.b, n = copySlice(m.dataBuf.b, m.PacketData()...)
var tmp []byte
tmp, _ = copySlice(tmp, m.PacketData()...)

return m.dataBuf.b[:n]
return tmp
}

// SetProtocolState set feedback/data that can be used later, e.g with End or Start hint
Expand All @@ -264,10 +152,6 @@ func (m *Message) Finalize() {
for _, p := range m.packets {
packetPool.Put(p)
}

if m.dataBuf != nil {
bufferPool.Put(m.dataBuf)
}
}

// Emitter message handler
Expand Down

0 comments on commit 61b377d

Please sign in to comment.