Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
lixizan committed Jan 4, 2023
1 parent b5152b4 commit 9e6f2f7
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 47 deletions.
40 changes: 40 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/lxzan/gws/internal"
"net"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -83,6 +84,45 @@ func (c *Conn) Listen() {
}
}

func (c *Conn) emitError(err error) {
if err == nil {
return
}
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
c.handlerError(err, nil)
c.handler.OnError(c, err)
}
}

func (c *Conn) handlerError(err error, buf *internal.Buffer) {
code := CloseNormalClosure
v, ok := err.(CloseCode)
if ok {
closeCode := v.Uint16()
if closeCode < 1000 || (closeCode >= 1016 && closeCode < 3000) {
code = CloseProtocolError
} else {
switch closeCode {
case 1004, 1005, 1006, 1014:
code = CloseProtocolError
default:
code = v
}
}
}
var content = code.Bytes()
if buf != nil {
content = append(content, buf.Bytes()...)
} else {
content = append(content, err.Error()...)
}
if len(content) > internal.Lv1 {
content = content[:internal.Lv1]
}
_ = c.writeMessage(OpcodeCloseConnection, content, true)
_ = c.conn.SetDeadline(time.Now())
}

func (c *Conn) isCanceled() bool {
select {
case <-c.ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion examples/bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewWebSocket() *WebSocket {
}

func (c *WebSocket) OnClose(socket *gws.Conn, message *gws.Message) {
fmt.Printf("onclose: code=%d, payload=%s", message.Code(), string(message.Bytes()))
fmt.Printf("onclose: code=%d, payload=%s\n", message.Code(), string(message.Bytes()))
}

type WebSocket struct{}
Expand Down
7 changes: 2 additions & 5 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,10 @@ func (c *Message) Bytes() []byte {
}

func payloadValid(opcode Opcode, buf *internal.Buffer) bool {
if buf.Len() == 0 {
if buf.Len() == 0 && !(opcode == OpcodeCloseConnection || opcode == OpcodeText) {
return true
}
if opcode == OpcodeCloseConnection || opcode == OpcodeText {
return utf8.Valid(buf.Bytes())
}
return true
return utf8.Valid(buf.Bytes())
}

func maskXOR(b []byte, key []byte) {
Expand Down
4 changes: 3 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (c *Conn) readMessage() error {

func (c *Conn) emitMessage(msg *Message, compressed bool) error {
if atomic.LoadUint32(&c.closed) == 1 {
return nil
return CloseNormalClosure
}
if c.isCanceled() {
return CloseServiceRestart
Expand Down Expand Up @@ -222,8 +222,10 @@ func (c *Conn) emitMessage(msg *Message, compressed bool) error {
c.handler.OnMessage(c, msg)
case OpcodeCloseConnection:
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
code := msg.Code()
c.handlerError(msg.Code(), msg.buf)
c.handler.OnClose(c, msg)
return code
}
}
return nil
Expand Down
40 changes: 0 additions & 40 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gws
import (
"github.com/lxzan/gws/internal"
"io"
"sync/atomic"
"time"
)

Expand All @@ -21,45 +20,6 @@ func writeN(writer io.Writer, content []byte, n int) error {
return nil
}

func (c *Conn) emitError(err error) {
if err == nil {
return
}
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
c.handlerError(err, nil)
c.handler.OnError(c, err)
}
}

func (c *Conn) handlerError(err error, buf *internal.Buffer) {
code := CloseNormalClosure
v, ok := err.(CloseCode)
if ok {
closeCode := v.Uint16()
if closeCode < 1000 || (closeCode >= 1016 && closeCode < 3000) {
code = CloseProtocolError
} else {
switch closeCode {
case 1004, 1005, 1006, 1014:
code = CloseProtocolError
default:
code = v
}
}
}
var content = code.Bytes()
if buf != nil {
content = append(content, buf.Bytes()...)
} else {
content = append(content, err.Error()...)
}
if len(content) > internal.Lv1 {
content = content[:internal.Lv1]
}
_ = c.writeMessage(OpcodeCloseConnection, content, true)
_ = c.conn.SetDeadline(time.Now())
}

// WriteClose write close frame
// 发送关闭帧
func (c *Conn) WriteClose(code CloseCode, reason []byte) {
Expand Down

0 comments on commit 9e6f2f7

Please sign in to comment.