Skip to content

Commit

Permalink
Merge pull request #9 from lxzan/dev
Browse files Browse the repository at this point in the history
v1.4.8 release
  • Loading branch information
lxzan authored Apr 28, 2023
2 parents 3a204b8 + e03ec83 commit b6b3b44
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 83 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Event interface {
#### Examples

- [chat room](examples/chatroom/main.go)
- [echo](examples/wss-server/server.go)
- [echo](examples/echo/main.go)

#### Quick Start

Expand All @@ -92,12 +92,12 @@ func main() {
})

http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Accept(writer, request)
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
log.Printf("Accept: " + err.Error())
return
}
go socket.Listen()
socket.ReadLoop()
})

if err := http.ListenAndServe(":3000", nil); err != nil {
Expand Down Expand Up @@ -135,7 +135,7 @@ func main() {
log.Printf(err.Error())
return
}
socket.Listen()
socket.ReadLoop()
}

type WebSocket struct {
Expand Down Expand Up @@ -175,11 +175,11 @@ func main() {
app := gin.New()
upgrader := gws.NewUpgrader(new(WebSocket), nil)
app.GET("/connect", func(ctx *gin.Context) {
socket, err := upgrader.Accept(ctx.Writer, ctx.Request)
socket, err := upgrader.Upgrade(ctx.Writer, ctx.Request)
if err != nil {
return
}
go upgrader.Listen(socket)
go upgrader.ReadLoop(socket)
})
if err := app.Run(":8080"); err != nil {
panic(err)
Expand All @@ -190,7 +190,7 @@ func main() {
- HeartBeat

```go
const PingInterval = 5 * time.Second
const PingInterval = 10 * time.Second

type Websocket struct {
gws.BuiltinEventHandler
Expand Down
28 changes: 10 additions & 18 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,25 @@ func newCompressor(level int) *compressor {

// 压缩器
type compressor struct {
buffer *bytes.Buffer
fw *flate.Writer
}

func (c *compressor) Close() {
_bpool.Put(c.buffer)
c.buffer = nil
fw *flate.Writer
}

// Compress 压缩
func (c *compressor) Compress(content *bytes.Buffer) (*bytes.Buffer, error) {
c.buffer = _bpool.Get(content.Len() / 3)
c.fw.Reset(c.buffer)
if err := internal.WriteN(c.fw, content.Bytes(), content.Len()); err != nil {
return nil, err
func (c *compressor) Compress(content []byte, buf *bytes.Buffer) error {
c.fw.Reset(buf)
if err := internal.WriteN(c.fw, content, len(content)); err != nil {
return err
}
if err := c.fw.Flush(); err != nil {
return nil, err
return err
}

if n := c.buffer.Len(); n >= 4 {
compressedContent := c.buffer.Bytes()
if n := buf.Len(); n >= 4 {
compressedContent := buf.Bytes()
if tail := compressedContent[n-4:]; binary.BigEndian.Uint32(tail) == math.MaxUint16 {
c.buffer.Truncate(n - 4)
buf.Truncate(n - 4)
}
}
return c.buffer, nil
return nil
}

func newDecompressor() *decompressor { return &decompressor{fr: flate.NewReader(nil)} }
Expand Down
22 changes: 9 additions & 13 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ func TestFlate(t *testing.T) {
var dps = newDecompressor()
var n = internal.AlphabetNumeric.Intn(1024)
var rawText = internal.AlphabetNumeric.Generate(n)
var buf = bytes.NewBufferString("")
buf.Write(rawText)
compressedText, err := cps.Compress(buf)
if err != nil {
var compressedBuf = bytes.NewBufferString("")
if err := cps.Compress(rawText, compressedBuf); err != nil {
as.NoError(err)
return
}

buf.Reset()
buf.Write(compressedText.Bytes())
var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
plainText, err := dps.Decompress(buf)
if err != nil {
as.NoError(err)
Expand All @@ -41,18 +39,16 @@ func TestFlate(t *testing.T) {
var dps = newDecompressor()
var n = internal.AlphabetNumeric.Intn(1024)
var rawText = internal.AlphabetNumeric.Generate(n)
var buf = bytes.NewBufferString("")
buf.Write(rawText)
compressedText, err := cps.Compress(buf)
if err != nil {
var compressedBuf = bytes.NewBufferString("")
if err := cps.Compress(rawText, compressedBuf); err != nil {
as.NoError(err)
return
}

buf.Reset()
buf.Write(compressedText.Bytes())
var buf = bytes.NewBufferString("")
buf.Write(compressedBuf.Bytes())
buf.WriteString("1234")
_, err = dps.Decompress(buf)
_, err := dps.Decompress(buf)
as.Error(err)
})
}
11 changes: 9 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,19 @@ func serveWebSocket(isServer bool, config *Config, session SessionStorage, netCo
return c
}

// Listen listening to websocket messages through a dead loop
// 监听websocket消息
// Listen 监听websocket消息
// Deprecated: Listen will be deprecated in future versions, please use ReadLoop instead.
func (c *Conn) Listen() {
c.ReadLoop()
}

// ReadLoop start a read message loop
// 启动一个读消息的死循环
func (c *Conn) ReadLoop() {
defer c.conn.Close()

c.handler.OnOpen(c)

for {
if err := c.readMessage(); err != nil {
c.emitError(err)
Expand Down
2 changes: 1 addition & 1 deletion examples/autobahn/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func testCase(id int) {
log.Println(err.Error())
return
}
go socket.Listen()
go socket.ReadLoop()
<-handler.onexit
}

Expand Down
4 changes: 2 additions & 2 deletions examples/autobahn/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func main() {
})

http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) {
socket, err := upgrader.Accept(writer, request)
socket, err := upgrader.Upgrade(writer, request)
if err != nil {
return
}
go socket.Listen()
socket.ReadLoop()
})

_ = http.ListenAndServe(":3000", nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
log.Printf(err.Error())
return
}
go socket.Listen()
go socket.ReadLoop()

for {
var text = ""
Expand Down
27 changes: 27 additions & 0 deletions examples/echo/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"github.com/lxzan/gws"
"log"
)

func main() {
var app = gws.NewServer(new(Handler), &gws.ServerOption{
CompressEnabled: true,
CheckUtf8Enabled: true,
})
log.Fatalf("%v", app.Run(":3000"))
}

type Handler struct {
gws.BuiltinEventHandler
}

func (c *Handler) OnPing(socket *gws.Conn, payload []byte) {
socket.WritePong(payload)
}

func (c *Handler) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
_ = socket.WriteMessage(message.Opcode, message.Bytes())
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 3 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
)

const frameHeaderSize = 14

type Opcode uint8

const (
Expand Down Expand Up @@ -62,7 +64,7 @@ func (b BuiltinEventHandler) OnPong(socket *Conn, payload []byte) {}

func (b BuiltinEventHandler) OnMessage(socket *Conn, message *Message) {}

type frameHeader [internal.FrameHeaderSize]byte
type frameHeader [frameHeaderSize]byte

func (c *frameHeader) GetFIN() bool {
return ((*c)[0] >> 7) == 1
Expand Down
25 changes: 21 additions & 4 deletions updrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ func (c *Upgrader) connectHandshake(r *http.Request, responseHeader http.Header,
}

// Accept http upgrade to websocket protocol
// Deprecated: Accept will be deprecated in future versions, please use Upgrade instead.
func (c *Upgrader) Accept(w http.ResponseWriter, r *http.Request) (*Conn, error) {
return c.Upgrade(w, r)
}

// Upgrade http upgrade to websocket protocol
func (c *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request) (*Conn, error) {
netConn, br, err := c.hijack(w)
if err != nil {
return nil, err
}

socket, err := c.doAccept(r, netConn, br)
socket, err := c.doUpgrade(r, netConn, br)
if err != nil {
_ = netConn.Close()
return nil, err
Expand All @@ -93,7 +99,7 @@ func (c *Upgrader) hijack(w http.ResponseWriter) (net.Conn, *bufio.Reader, error
return netConn, brw.Reader, nil
}

func (c *Upgrader) doAccept(r *http.Request, netConn net.Conn, br *bufio.Reader) (*Conn, error) {
func (c *Upgrader) doUpgrade(r *http.Request, netConn net.Conn, br *bufio.Reader) (*Conn, error) {
var session = new(sliceMap)
var header = c.option.ResponseHeader.Clone()
if !c.option.CheckOrigin(r, session) {
Expand Down Expand Up @@ -138,6 +144,10 @@ func (c *Upgrader) doAccept(r *http.Request, netConn net.Conn, br *bufio.Reader)
type Server struct {
upgrader *Upgrader

// OnConnect 建立连接事件, 用于处理限流, 熔断和安全问题; 返回错误将会断开连接.
// Creates connection events for current limit, fuse and security issues; returning an error will disconnect.
OnConnect func(conn net.Conn) error

// OnError 接收握手过程中产生的错误回调
// Receive error callbacks generated during the handshake
OnError func(conn net.Conn, err error)
Expand All @@ -147,6 +157,7 @@ type Server struct {
// create a websocket server
func NewServer(eventHandler Event, option *ServerOption) *Server {
var c = &Server{upgrader: NewUpgrader(eventHandler, option)}
c.OnConnect = func(conn net.Conn) error { return nil }
c.OnError = func(conn net.Conn, err error) {}
return c
}
Expand Down Expand Up @@ -236,6 +247,12 @@ func (c *Server) serve(listener net.Listener) error {
}

go func() {
if err := c.OnConnect(conn); err != nil {
_ = conn.Close()
c.OnError(conn, err)
return
}

br := bufio.NewReaderSize(conn, c.upgrader.option.ReadBufferSize)
r, err := c.parseRequest(conn, br)
if err != nil {
Expand All @@ -244,13 +261,13 @@ func (c *Server) serve(listener net.Listener) error {
return
}

socket, err := c.upgrader.doAccept(r, conn, br)
socket, err := c.upgrader.doUpgrade(r, conn, br)
if err != nil {
_ = conn.Close()
c.OnError(conn, err)
return
}
socket.Listen()
socket.ReadLoop()
}()
}
}
Loading

0 comments on commit b6b3b44

Please sign in to comment.