diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 3bace72c..ee3755c2 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,6 +19,6 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.16 + go-version: 1.18 - name: Test run: go test -v ./... diff --git a/README.md b/README.md index f4cae17d..783422c3 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ [4]: LICENSE -[5]: https://img.shields.io/badge/go-%3E%3D1.16-30dff3?style=flat-square&logo=go +[5]: https://img.shields.io/badge/go-%3E%3D1.18-30dff3?style=flat-square&logo=go [6]: https://github.com/lxzan/gws @@ -34,6 +34,7 @@ - [Upgrade from HTTP](#upgrade-from-http) - [Unix Domain Socket](#unix-domain-socket) - [Broadcast](#broadcast) + - [Write JSON](#write-json) - [Autobahn Test](#autobahn-test) - [Benchmark](#benchmark) - [Communication](#communication) @@ -41,7 +42,7 @@ ### Highlight -- No dependency +- Single dependency - IO multiplexing support, concurrent message processing and asynchronous non-blocking message writing - High IOPS and low latency, low CPU usage - Support fast parsing WebSocket protocol directly from TCP, faster handshake, 30% lower memory usage @@ -207,6 +208,11 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { } ``` +#### Write JSON +```go +socket.WriteAny(gws.JsonCodec, gws.OpcodeText, data) +``` + ### Autobahn Test ```bash diff --git a/compress.go b/compress.go index 2d2d115b..65b43ddf 100644 --- a/compress.go +++ b/compress.go @@ -2,25 +2,55 @@ package gws import ( "bytes" - "compress/flate" "encoding/binary" + "github.com/klauspost/compress/flate" "github.com/lxzan/gws/internal" "io" "math" + "sync" + "sync/atomic" ) +const numCompressor = 32 + +type compressors struct { + sync.RWMutex + serial uint64 + compressors [12][numCompressor]*compressor +} + +func (c *compressors) Select(level int) *compressor { + var i = level + 2 + var j = atomic.AddUint64(&c.serial, 1) & (numCompressor - 1) + c.RLock() + var cps = c.compressors[i][j] + c.RUnlock() + + if cps == nil { + c.Lock() + cps = newCompressor(level) + c.compressors[i][j] = cps + c.Unlock() + } + return cps +} + func newCompressor(level int) *compressor { fw, _ := flate.NewWriter(nil, level) - return &compressor{fw: fw} + return &compressor{fw: fw, mu: &sync.Mutex{}} } // 压缩器 type compressor struct { + mu *sync.Mutex fw *flate.Writer } // Compress 压缩 func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error { + c.mu.Lock() + defer c.mu.Unlock() + c.fw.Reset(buf) if err := internal.WriteN(c.fw, content, len(content)); err != nil { return err @@ -37,20 +67,64 @@ func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error { return nil } -func newDecompressor() *decompressor { return &decompressor{fr: flate.NewReader(nil)} } +func (c *compressor) CompressAny(codec Codec, v interface{}, buf *bytes.Buffer) error { + c.mu.Lock() + defer c.mu.Unlock() + + c.fw.Reset(buf) + if err := codec.NewEncoder(c.fw).Encode(v); err != nil { + return err + } + if err := c.fw.Flush(); err != nil { + return err + } + if n := buf.Len(); n >= 4 { + compressedContent := buf.Bytes() + if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 { + buf.Truncate(n - 4) + } + } + return nil +} + +type decompressors struct { + serial uint64 + decompressors [numCompressor]*decompressor +} + +func (c *decompressors) init() *decompressors { + for i, _ := range c.decompressors { + c.decompressors[i] = newDecompressor() + } + return c +} + +func (c *decompressors) Select() *decompressor { + var index = atomic.AddUint64(&c.serial, 1) & (numCompressor - 1) + return c.decompressors[index] +} + +func newDecompressor() *decompressor { + return &decompressor{fr: flate.NewReader(nil), mu: &sync.Mutex{}} +} type decompressor struct { - fr io.ReadCloser + mu *sync.Mutex + fr io.ReadCloser + buffer [internal.Lv3]byte } // Decompress 解压 func (c *decompressor) Decompress(payload *bytes.Buffer) (*bytes.Buffer, error) { + c.mu.Lock() + defer c.mu.Unlock() + _, _ = payload.Write(internal.FlateTail) resetter := c.fr.(flate.Resetter) _ = resetter.Reset(payload, nil) // must return a null pointer var buf = _bpool.Get(3 * payload.Len()) - _, err := io.Copy(buf, c.fr) + _, err := io.CopyBuffer(buf, c.fr, c.buffer[0:]) _bpool.Put(payload) return buf, err } diff --git a/conn.go b/conn.go index 2cdb297c..23b0944d 100644 --- a/conn.go +++ b/conn.go @@ -26,14 +26,10 @@ type Conn struct { config *Config // read buffer rbuf *bufio.Reader - // flate decompressor - decompressor *decompressor // continuation frame continuationFrame continuationFrame // frame header for read fh frameHeader - // flate compressor - compressor *compressor // WebSocket Event Handler handler Event @@ -62,12 +58,6 @@ func serveWebSocket(isServer bool, config *Config, session SessionStorage, netCo readQueue: workerQueue{maxConcurrency: int32(config.ReadAsyncGoLimit), capacity: config.ReadAsyncCap}, writeQueue: workerQueue{maxConcurrency: 1, capacity: config.WriteAsyncCap}, } - - if c.compressEnabled { - c.compressor = newCompressor(config.CompressLevel) - c.decompressor = newDecompressor() - } - return c } diff --git a/examples/chatroom/main.go b/examples/chatroom/main.go index 75226322..6e37af78 100644 --- a/examples/chatroom/main.go +++ b/examples/chatroom/main.go @@ -46,17 +46,17 @@ func main() { _, _ = writer.Write(html) }) - if err := http.ListenAndServe(":3000", nil); err != nil { + if err := http.ListenAndServe(":8000", nil); err != nil { log.Fatalf("%+v", err) } } func NewWebSocket() *WebSocket { - return &WebSocket{sessions: gws.NewConcurrentMap(16)} + return &WebSocket{sessions: gws.NewConcurrentMap[string, *gws.Conn](16)} } type WebSocket struct { - sessions *gws.ConcurrentMap // 使用内置的ConcurrentMap存储连接, 可以减少锁冲突 + sessions *gws.ConcurrentMap[string, *gws.Conn] // 使用内置的ConcurrentMap存储连接, 可以减少锁冲突 } func (c *WebSocket) getName(socket *gws.Conn) string { @@ -69,21 +69,11 @@ func (c *WebSocket) getKey(socket *gws.Conn) string { return name.(string) } -// 根据用户名获取WebSocket连接 -func (c *WebSocket) GetSocket(name string) (*gws.Conn, bool) { - if v0, ok0 := c.sessions.Load(name); ok0 { - if v1, ok1 := v0.(*gws.Conn); ok1 { - return v1, true - } - } - return nil, false -} - // RemoveSocket 移除WebSocket连接 func (c *WebSocket) RemoveSocket(socket *gws.Conn) { name := c.getName(socket) key := c.getKey(socket) - if mSocket, ok := c.GetSocket(name); ok { + if mSocket, ok := c.sessions.Load(name); ok { if mKey := c.getKey(mSocket); mKey == key { c.sessions.Delete(name) } @@ -92,8 +82,7 @@ func (c *WebSocket) RemoveSocket(socket *gws.Conn) { func (c *WebSocket) OnOpen(socket *gws.Conn) { name := c.getName(socket) - if v, ok := c.sessions.Load(name); ok { - var conn = v.(*gws.Conn) + if conn, ok := c.sessions.Load(name); ok { conn.WriteClose(1000, []byte("connection replaced")) } socket.SetDeadline(time.Now().Add(3 * PingInterval)) @@ -134,7 +123,7 @@ func (c *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) { var input = &Input{} _ = json.Unmarshal(message.Data.Bytes(), input) - if v, ok := c.sessions.Load(input.To); ok { - v.(*gws.Conn).WriteMessage(gws.OpcodeText, message.Data.Bytes()) + if conn, ok := c.sessions.Load(input.To); ok { + conn.WriteMessage(gws.OpcodeText, message.Data.Bytes()) } } diff --git a/go.mod b/go.mod index fc903902..2f80c3a5 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,14 @@ module github.com/lxzan/gws -go 1.16 +go 1.18 -require github.com/stretchr/testify v1.8.1 +require ( + github.com/klauspost/compress v1.16.5 + github.com/stretchr/testify v1.8.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 2ec90f70..738513ad 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/init.go b/init.go new file mode 100644 index 00000000..4f356bf9 --- /dev/null +++ b/init.go @@ -0,0 +1,12 @@ +package gws + +import "github.com/lxzan/gws/internal" + +var ( + _bpool = internal.NewBufferPool() + _cps = new(compressors) + _dps = new(decompressors).init() + _padding = frameHeader{} + + JsonCodec = new(jsonCodec) +) diff --git a/reader.go b/reader.go index a0536af7..769286c3 100644 --- a/reader.go +++ b/reader.go @@ -7,8 +7,6 @@ import ( "github.com/lxzan/gws/internal" ) -var _bpool = internal.NewBufferPool() - func (c *Conn) checkMask(enabled bool) error { // RFC6455: All frames sent from client to server have this bit set to 1. if (c.isServer && !enabled) || (!c.isServer && enabled) { @@ -149,7 +147,7 @@ func (c *Conn) readMessage() error { func (c *Conn) emitMessage(msg *Message, compressed bool) error { if compressed { - data, err := c.decompressor.Decompress(msg.Data) + data, err := _dps.Select().Decompress(msg.Data) if err != nil { return internal.NewError(internal.CloseInternalServerErr, err) } diff --git a/session_storage.go b/session_storage.go index 68bce593..da7cc70c 100644 --- a/session_storage.go +++ b/session_storage.go @@ -100,18 +100,18 @@ used to store websocket connections in the IM server 用来存储IM等服务的连接 */ type ( - ConcurrentMap struct { + ConcurrentMap[K comparable, V any] struct { segments uint64 - buckets []*bucket + buckets []*bucket[K, V] } - bucket struct { + bucket[K comparable, V any] struct { sync.RWMutex - m map[interface{}]interface{} + m map[K]V } ) -func NewConcurrentMap(segments uint64) *ConcurrentMap { +func NewConcurrentMap[K comparable, V any](segments uint64) *ConcurrentMap[K, V] { if segments == 0 { segments = 16 } else { @@ -121,14 +121,14 @@ func NewConcurrentMap(segments uint64) *ConcurrentMap { } segments = num } - var cm = &ConcurrentMap{segments: segments, buckets: make([]*bucket, segments, segments)} + var cm = &ConcurrentMap[K, V]{segments: segments, buckets: make([]*bucket[K, V], segments, segments)} for i, _ := range cm.buckets { - cm.buckets[i] = &bucket{m: make(map[interface{}]interface{})} + cm.buckets[i] = &bucket[K, V]{m: make(map[K]V)} } return cm } -func (c *ConcurrentMap) hash(key interface{}) uint64 { +func (c *ConcurrentMap[K, V]) hash(key interface{}) uint64 { switch k := key.(type) { case string: return internal.FNV64(k) @@ -157,13 +157,13 @@ func (c *ConcurrentMap) hash(key interface{}) uint64 { } } -func (c *ConcurrentMap) getBucket(key interface{}) *bucket { +func (c *ConcurrentMap[K, V]) getBucket(key K) *bucket[K, V] { var hashCode = c.hash(key) var index = hashCode & (c.segments - 1) return c.buckets[index] } -func (c *ConcurrentMap) Len() int { +func (c *ConcurrentMap[K, V]) Len() int { var length = 0 for _, b := range c.buckets { b.RLock() @@ -173,7 +173,7 @@ func (c *ConcurrentMap) Len() int { return length } -func (c *ConcurrentMap) Load(key interface{}) (value interface{}, exist bool) { +func (c *ConcurrentMap[K, V]) Load(key K) (value V, exist bool) { var b = c.getBucket(key) b.RLock() value, exist = b.m[key] @@ -181,14 +181,14 @@ func (c *ConcurrentMap) Load(key interface{}) (value interface{}, exist bool) { return } -func (c *ConcurrentMap) Delete(key interface{}) { +func (c *ConcurrentMap[K, V]) Delete(key K) { var b = c.getBucket(key) b.Lock() delete(b.m, key) b.Unlock() } -func (c *ConcurrentMap) Store(key interface{}, value interface{}) { +func (c *ConcurrentMap[K, V]) Store(key K, value V) { var b = c.getBucket(key) b.Lock() b.m[key] = value @@ -197,7 +197,7 @@ func (c *ConcurrentMap) Store(key interface{}, value interface{}) { // Range calls f sequentially for each key and value present in the map. // If f returns false, range stops the iteration. -func (c *ConcurrentMap) Range(f func(key interface{}, value interface{}) bool) { +func (c *ConcurrentMap[K, V]) Range(f func(key K, value V) bool) { for _, b := range c.buckets { b.RLock() for k, v := range b.m { diff --git a/session_storage_test.go b/session_storage_test.go index c801c939..edf6c9ea 100644 --- a/session_storage_test.go +++ b/session_storage_test.go @@ -96,8 +96,8 @@ func TestMap_Range(t *testing.T) { func TestConcurrentMap(t *testing.T) { var as = assert.New(t) - var m1 = make(map[interface{}]interface{}) - var m2 = NewConcurrentMap(5) + var m1 = make(map[string]interface{}) + var m2 = NewConcurrentMap[string, uint32](5) var count = internal.AlphabetNumeric.Intn(1000) for i := 0; i < count; i++ { var key = string(internal.AlphabetNumeric.Generate(10)) @@ -106,7 +106,7 @@ func TestConcurrentMap(t *testing.T) { m2.Store(key, val) } - var keys = make([]interface{}, 0) + var keys = make([]string, 0) for k, _ := range m1 { keys = append(keys, k) } @@ -126,7 +126,7 @@ func TestConcurrentMap(t *testing.T) { func TestConcurrentMap_Range(t *testing.T) { var as = assert.New(t) var m1 = make(map[interface{}]interface{}) - var m2 = NewConcurrentMap(13) + var m2 = NewConcurrentMap[string, uint32](13) var count = 1000 for i := 0; i < count; i++ { var key = string(internal.AlphabetNumeric.Generate(10)) @@ -137,7 +137,7 @@ func TestConcurrentMap_Range(t *testing.T) { { var keys []interface{} - m2.Range(func(key interface{}, value interface{}) bool { + m2.Range(func(key string, value uint32) bool { v, ok := m1[key] as.Equal(true, ok) as.Equal(v, value) @@ -149,7 +149,7 @@ func TestConcurrentMap_Range(t *testing.T) { { var keys []interface{} - m2.Range(func(key interface{}, value interface{}) bool { + m2.Range(func(key string, value uint32) bool { v, ok := m1[key] as.Equal(true, ok) as.Equal(v, value) @@ -161,7 +161,7 @@ func TestConcurrentMap_Range(t *testing.T) { } func TestHash(t *testing.T) { - m := NewConcurrentMap(8) + m := NewConcurrentMap[string, uint32](8) m.hash("1") m.hash(int(1)) @@ -178,6 +178,6 @@ func TestHash(t *testing.T) { assert.Equal(t, uint64(0), m.hash(map[string]string{})) - m = NewConcurrentMap(0) + m = NewConcurrentMap[string, uint32](0) assert.Equal(t, uint64(16), m.segments) } diff --git a/writer.go b/writer.go index 8e402f48..0339f196 100644 --- a/writer.go +++ b/writer.go @@ -1,10 +1,29 @@ package gws import ( + "bytes" + "encoding/json" "errors" "github.com/lxzan/gws/internal" + "io" ) +type ( + Codec interface { + NewEncoder(io.Writer) Encoder + } + + Encoder interface { + Encode(v interface{}) error + } + + jsonCodec struct{} +) + +func (c jsonCodec) NewEncoder(writer io.Writer) Encoder { + return json.NewEncoder(writer) +} + // WriteClose proactively close the connection // code: https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codes // 通过emitError发送关闭帧, 将连接状态置为关闭, 用于服务端主动断开连接 @@ -54,7 +73,7 @@ func (c *Conn) doWrite(opcode Opcode, payload []byte) error { } if c.compressEnabled && opcode.IsDataFrame() && len(payload) >= c.config.CompressThreshold { - return c.writeCompressedContents(opcode, payload) + return c.compressAndWrite(opcode, payload) } var n = len(payload) @@ -77,23 +96,67 @@ func (c *Conn) doWrite(opcode Opcode, payload []byte) error { return err } -func (c *Conn) writeCompressedContents(opcode Opcode, payload []byte) error { - var buf = _bpool.Get(len(payload) / 3) - defer _bpool.Put(buf) +// WriteAsync 异步非阻塞地写入消息 +// Write messages asynchronously and non-blockingly +func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error { + if c.isClosed() { + return internal.ErrConnClosed + } + return c.writeQueue.Push(func() { c.emitError(c.doWrite(opcode, payload)) }) +} - var header = frameHeader{} - buf.Write(header[0:]) - if err := c.compressor.Compress(payload, buf); err != nil { +// WriteAny 以特定编码写入数据 +// 使用此方法时, CheckUtf8Enabled=false且CompressThreshold选项无效 +// Write data in a specific encoding +// When using this method, CheckUtf8Enabled=false and CompressThreshold option is disabled +func (c *Conn) WriteAny(codec Codec, opcode Opcode, v interface{}) error { + if c.isClosed() { + return internal.ErrConnClosed + } + + var buf = _bpool.Get(internal.Lv3) + c.wmu.Lock() + err := c.doWriteAny(opcode, v, codec, buf) + c.wmu.Unlock() + _bpool.Put(buf) + + c.emitError(err) + return err +} + +func (c *Conn) doWriteAny(opcode Opcode, v interface{}, codec Codec, buf *bytes.Buffer) error { + buf.Write(_padding[0:]) + var compress = c.compressEnabled && opcode.IsDataFrame() + var err error + if compress { + err = _cps.Select(c.config.CompressLevel).CompressAny(codec, v, buf) + } else { + err = codec.NewEncoder(buf).Encode(v) + } + if err != nil { return err } + return c.leftTrimAndWrite(opcode, buf, compress) +} + +func (c *Conn) compressAndWrite(opcode Opcode, payload []byte) error { + var buf = _bpool.Get(len(payload) / 2) + defer _bpool.Put(buf) + buf.Write(_padding[0:]) + if err := _cps.Select(c.config.CompressLevel).Compress(payload, buf); err != nil { + return err + } + return c.leftTrimAndWrite(opcode, buf, true) +} +func (c *Conn) leftTrimAndWrite(opcode Opcode, buf *bytes.Buffer, compress bool) error { var contents = buf.Bytes() var payloadSize = buf.Len() - frameHeaderSize if payloadSize > c.config.WriteMaxPayloadSize { return internal.CloseMessageTooLarge } - - headerLength, maskBytes := header.GenerateHeader(c.isServer, true, true, opcode, payloadSize) + var header = frameHeader{} + headerLength, maskBytes := header.GenerateHeader(c.isServer, true, compress, opcode, payloadSize) if !c.isServer { internal.MaskXOR(contents[frameHeaderSize:], maskBytes) } @@ -101,12 +164,3 @@ func (c *Conn) writeCompressedContents(opcode Opcode, payload []byte) error { copy(contents[:headerLength], header[:headerLength]) return internal.WriteN(c.conn, contents, payloadSize+headerLength) } - -// WriteAsync 异步非阻塞地写入消息 -// Write messages asynchronously and non-blockingly -func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error { - if c.isClosed() { - return internal.ErrConnClosed - } - return c.writeQueue.Push(func() { c.emitError(c.doWrite(opcode, payload)) }) -} diff --git a/writer_test.go b/writer_test.go index 4d7d259e..c6f37c8c 100644 --- a/writer_test.go +++ b/writer_test.go @@ -2,8 +2,11 @@ package gws import ( "bytes" + "compress/flate" + "encoding/json" "github.com/lxzan/gws/internal" "github.com/stretchr/testify/assert" + "math" "net" "sync" "testing" @@ -16,7 +19,7 @@ func testWrite(c *Conn, fin bool, opcode Opcode, payload []byte) error { var useCompress = c.compressEnabled && opcode.IsDataFrame() && len(payload) >= c.config.CompressThreshold if useCompress { var buf = bytes.NewBufferString("") - err := c.compressor.Compress(payload, buf) + err := _cps.Select(flate.BestSpeed).Compress(payload, buf) if err != nil { return internal.NewError(internal.CloseInternalServerErr, err) } @@ -131,10 +134,89 @@ func TestConn_WriteClose(t *testing.T) { go server.Listen() go client.Listen() - //var payload = internal.CloseGoingAway.Bytes() - //payload = append(payload, "goodbye"...) server.WriteMessage(OpcodeText, nil) server.WriteMessage(OpcodeText, []byte("hello")) server.WriteMessage(OpcodeCloseConnection, []byte{1}) wg.Wait() } + +func TestConn_WriteAny(t *testing.T) { + type Model struct { + A string `json:"a"` + B string `json:"b"` + } + + t.Run("compress enable", func(t *testing.T) { + var count = 1000 + var wg = &sync.WaitGroup{} + wg.Add(count) + var expectedHash = uint64(0) + var actualHash = uint64(0) + + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{CompressEnabled: true} + var clientOption = &ClientOption{CompressEnabled: true} + serverHandler.onMessage = func(socket *Conn, message *Message) { + var m = &Model{} + json.Unmarshal(message.Bytes(), m) + actualHash += internal.FNV64(m.A) & math.MaxUint32 + actualHash += internal.FNV64(m.B) & math.MaxUint32 + wg.Done() + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + for i := 0; i < 1000; i++ { + var m = Model{ + A: string(internal.AlphabetNumeric.Generate(1024)), + B: string(internal.AlphabetNumeric.Generate(512)), + } + expectedHash += internal.FNV64(m.A) & math.MaxUint32 + expectedHash += internal.FNV64(m.B) & math.MaxUint32 + client.WriteAny(JsonCodec, OpcodeText, m) + } + + wg.Wait() + assert.Equal(t, expectedHash, actualHash) + }) + + t.Run("compress disable", func(t *testing.T) { + var count = 1000 + var wg = &sync.WaitGroup{} + wg.Add(count) + var expectedHash = uint64(0) + var actualHash = uint64(0) + + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{CompressEnabled: false} + var clientOption = &ClientOption{CompressEnabled: false} + serverHandler.onMessage = func(socket *Conn, message *Message) { + var m = &Model{} + json.Unmarshal(message.Bytes(), m) + actualHash += internal.FNV64(m.A) & math.MaxUint32 + actualHash += internal.FNV64(m.B) & math.MaxUint32 + wg.Done() + } + + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + for i := 0; i < 1000; i++ { + var m = Model{ + A: string(internal.AlphabetNumeric.Generate(1024)), + B: string(internal.AlphabetNumeric.Generate(512)), + } + expectedHash += internal.FNV64(m.A) & math.MaxUint32 + expectedHash += internal.FNV64(m.B) & math.MaxUint32 + client.WriteAny(JsonCodec, OpcodeText, m) + } + + wg.Wait() + assert.Equal(t, expectedHash, actualHash) + }) +}