Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.8.7 #111

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ ok github.com/lxzan/gws 17.231s
- [Introduction](#introduction)
- [Why GWS](#why-gws)
- [Benchmark](#benchmark)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [Index](#index)
- [Feature](#feature)
- [Attention](#attention)
Expand All @@ -87,13 +87,14 @@ ok github.com/lxzan/gws 17.231s
- [Quick Start](#quick-start)
- [Best Practice](#best-practice)
- [More Examples](#more-examples)
- [KCP](#kcp)
- [Proxy](#proxy)
- [Broadcast](#broadcast)
- [WriteWithTimeout](#writewithtimeout)
- [Pub / Sub](#pub--sub)
- [KCP](#kcp)
- [Proxy](#proxy)
- [Broadcast](#broadcast)
- [WriteWithTimeout](#writewithtimeout)
- [Pub / Sub](#pub--sub)
- [Autobahn Test](#autobahn-test)
- [Communication](#communication)
- [Buy me a coffee](#buy-me-a-coffee)
- [Acknowledgments](#acknowledgments)

### Feature
Expand Down Expand Up @@ -388,6 +389,10 @@ docker run -it --rm \
<img src="assets/qq.jpg" alt="QQ" width="300" height="300" style="display: inline-block"/>
</div>

### Buy me a coffee

<img src="assets/alipay.jpg" alt="WeChat" width="300" style="display: inline-block;"/>

### Acknowledgments

The following project had particular influence on gws's design.
Expand Down
19 changes: 12 additions & 7 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ ok github.com/lxzan/gws 17.231s
- [介绍](#介绍)
- [为什么选择 GWS](#为什么选择-gws)
- [基准测试](#基准测试)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [Index](#index)
- [特性](#特性)
- [注意](#注意)
Expand All @@ -77,13 +77,14 @@ ok github.com/lxzan/gws 17.231s
- [快速上手](#快速上手)
- [最佳实践](#最佳实践)
- [更多用例](#更多用例)
- [KCP](#kcp)
- [代理](#代理)
- [广播](#广播)
- [写入超时](#写入超时)
- [发布/订阅](#发布订阅)
- [KCP](#kcp)
- [代理](#代理)
- [广播](#广播)
- [写入超时](#写入超时)
- [发布/订阅](#发布订阅)
- [Autobahn 测试](#autobahn-测试)
- [交流](#交流)
- [赞赏](#赞赏)
- [致谢](#致谢)

### 特性
Expand Down Expand Up @@ -375,6 +376,10 @@ docker run -it --rm \
<img src="assets/qq.jpg" alt="QQ" width="300" height="300" style="display: inline-block"/>
</div>

### 赞赏

<img src="assets/alipay.jpg" alt="WeChat" width="300" style="display: inline-block;"/>

### 致谢

- [crossbario/autobahn-testsuite](https://github.com/crossbario/autobahn-testsuite)
Expand Down
Binary file added assets/alipay.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 8 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return nil
}

func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error {
cpsWriter.ResetDict(w, dict)
if _, err := r.WriteTo(cpsWriter); err != nil {
return err
}
return cpsWriter.Flush()
}

// 滑动窗口
// Sliding window
type slideWindow struct {
Expand Down
112 changes: 21 additions & 91 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *Conn) ReadLoop() {
// Infinite loop to read messages, if an error occurs, trigger the error event and exit the loop
for {
if err := c.readMessage(); err != nil {
c.emitError(err)
c.emitError(true, err)
break
}
}
Expand All @@ -125,106 +125,36 @@ func (c *Conn) ReadLoop() {
}
}

// 获取压缩字典
// Get compressed dictionary
func (c *Conn) getCpsDict(isBroadcast bool) []byte {
// 广播模式必须保证每一帧都是相同的内容, 所以不使用上下文接管优化压缩率
// In broadcast mode, each frame must be the same content, so context takeover is not used to optimize compression ratio
if isBroadcast {
return nil
}

// 如果是服务器并且服务器上下文接管启用,返回压缩字典
// If it is a server and server context takeover is enabled, return the compression dictionary
if c.isServer && c.pd.ServerContextTakeover {
return c.cpsWindow.dict
}

// 如果是客户端并且客户端上下文接管启用,返回压缩字典
// If client-side and client context takeover is enabled, return the compression dictionary
if !c.isServer && c.pd.ClientContextTakeover {
return c.cpsWindow.dict
}

return nil
}

// 获取解压字典
// Get decompression dictionary
func (c *Conn) getDpsDict() []byte {
// 如果是服务器并且客户端上下文接管启用,返回解压字典
// If it is a server and client context takeover is enabled, return the decompression dictionary
if c.isServer && c.pd.ClientContextTakeover {
return c.dpsWindow.dict
}

// 如果是客户端并且服务器上下文接管启用,返回解压字典
// If it is a client and server context takeover is enabled, return the decompressed dictionary
if !c.isServer && c.pd.ServerContextTakeover {
return c.dpsWindow.dict
}

return nil
}

// UTF8编码检查
// UTF8 encoding check
func (c *Conn) isTextValid(opcode Opcode, payload []byte) bool {
if c.config.CheckUtf8Enabled {
return internal.CheckEncoding(uint8(opcode), payload)
}
return true
}

// 检查连接是否已关闭
// Checks if the connection is closed
func (c *Conn) isClosed() bool {
return atomic.LoadUint32(&c.closed) == 1
}

// 关闭连接并存储错误信息
// Closes the connection and stores the error information
func (c *Conn) close(reason []byte, err error) {
c.ev.Store(err)
_ = c.doWrite(OpcodeCloseConnection, internal.Bytes(reason))
_ = c.conn.Close()
}

// 处理错误事件
// Handle the error event
func (c *Conn) emitError(err error) {
func (c *Conn) emitError(reading bool, err error) {
if err == nil {
return
}

// 使用原子操作检查并设置连接的关闭状态
// Use atomic operation to check and set the closed state of the connection
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
var responseCode = internal.CloseNormalClosure
var responseErr error = internal.CloseNormalClosure

// 根据错误类型设置响应代码和响应错误
// Set response code and response error based on the error type
switch v := err.(type) {
case internal.StatusCode:
responseCode = v
case *internal.Error:
responseCode = v.Code
responseErr = v.Err
default:
responseErr = err
}

var content = responseCode.Bytes()
content = append(content, err.Error()...)

// 如果内容长度超过阈值,截断内容
// If the content length exceeds the threshold, truncate the content
if len(content) > internal.ThresholdV1 {
content = content[:internal.ThresholdV1]
// 待发送的错误码和错误原因
// Error code to be sent and cause of error
var sendCode, sendErr = internal.CloseGoingAway, error(internal.CloseGoingAway)
if reading {
switch v := err.(type) {
case internal.StatusCode:
sendCode, sendErr = v, v
case *internal.Error:
sendCode, sendErr, err = v.Code, v.Err, v.Err
default:
sendCode, sendErr = internal.CloseNormalClosure, err
}
}

c.close(content, responseErr)
var reason = append(sendCode.Bytes(), sendErr.Error()...)
_ = c.writeClose(err, reason)
}
}

Expand Down Expand Up @@ -257,12 +187,12 @@ func (c *Conn) emitClose(buf *bytes.Buffer) error {
responseCode = internal.StatusCode(realCode)
}
}
if !c.isTextValid(OpcodeCloseConnection, buf.Bytes()) {
if !internal.CheckEncoding(c.config.CheckUtf8Enabled, uint8(OpcodeCloseConnection), buf.Bytes()) {
responseCode = internal.CloseUnsupportedData
}
}
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
c.close(responseCode.Bytes(), &CloseError{Code: realCode, Reason: buf.Bytes()})
_ = c.writeClose(&CloseError{Code: realCode, Reason: buf.Bytes()}, responseCode.Bytes())
}
return internal.CloseNormalClosure
}
Expand All @@ -271,23 +201,23 @@ func (c *Conn) emitClose(buf *bytes.Buffer) error {
// Sets the deadline for the connection
func (c *Conn) SetDeadline(t time.Time) error {
err := c.conn.SetDeadline(t)
c.emitError(err)
c.emitError(false, err)
return err
}

// SetReadDeadline 设置读取操作的截止时间
// Sets the deadline for read operations
func (c *Conn) SetReadDeadline(t time.Time) error {
err := c.conn.SetReadDeadline(t)
c.emitError(err)
c.emitError(false, err)
return err
}

// SetWriteDeadline 设置写入操作的截止时间
// Sets the deadline for write operations
func (c *Conn) SetWriteDeadline(t time.Time) error {
err := c.conn.SetWriteDeadline(t)
c.emitError(err)
c.emitError(false, err)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@ func TestConn_EmitError(t *testing.T) {
server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go client.ReadLoop()
err := errors.New(string(internal.AlphabetNumeric.Generate(500)))
server.emitError(err)
server.emitError(false, err)
wg.Wait()
}
34 changes: 17 additions & 17 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package internal
// closeErrorMap 将状态码映射到错误信息
// map status codes to error messages
var closeErrorMap = map[StatusCode]string{
0: "empty code",
CloseNormalClosure: "close normal",
CloseGoingAway: "client going away",
CloseProtocolError: "protocol error",
CloseUnsupported: "unsupported data",
CloseNoStatusReceived: "no status",
CloseAbnormalClosure: "abnormal closure",
CloseUnsupportedData: "invalid payload data",
ClosePolicyViolation: "policy violation",
CloseMessageTooLarge: "message too large",
CloseMissingExtension: "mandatory extension missing",
CloseInternalServerErr: "internal server error",
CloseServiceRestart: "server restarting",
CloseTryAgainLater: "try again later",
CloseTLSHandshake: "TLS handshake error",
0: "empty code",
CloseNormalClosure: "close normal",
CloseGoingAway: "client going away",
CloseProtocolError: "protocol error",
CloseUnsupported: "unsupported data",
CloseNoStatusReceived: "no status",
CloseAbnormalClosure: "abnormal closure",
CloseUnsupportedData: "invalid payload data",
ClosePolicyViolation: "policy violation",
CloseMessageTooLarge: "message too large",
CloseMissingExtension: "mandatory extension missing",
CloseInternalErr: "internal error",
CloseServiceRestart: "server restarting",
CloseTryAgainLater: "try again later",
CloseTLSHandshake: "TLS handshake error",
}

// StatusCode WebSocket错误码
Expand Down Expand Up @@ -55,8 +55,8 @@ const (
// CloseMissingExtension 客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
CloseMissingExtension StatusCode = 1010

// CloseInternalServerErr 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
CloseInternalServerErr StatusCode = 1011
// CloseInternalErr 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
CloseInternalErr StatusCode = 1011

// CloseServiceRestart 服务器由于重启而断开连接. [Ref]
CloseServiceRestart StatusCode = 1012
Expand Down
21 changes: 7 additions & 14 deletions internal/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ func WriteN(writer io.Writer, content []byte) error {

// CheckEncoding 检查 payload 的编码是否有效
// checks if the encoding of the payload is valid
func CheckEncoding(opcode uint8, payload []byte) bool {
switch opcode {
case 1, 8:
func CheckEncoding(enabled bool, opcode uint8, payload []byte) bool {
if enabled && (opcode == 1 || opcode == 8) {
return utf8.Valid(payload)
default:
return true
}
return true
}

type Payload interface {
Expand All @@ -39,11 +37,9 @@ type Payload interface {
type Buffers [][]byte

func (b Buffers) CheckEncoding(enabled bool, opcode uint8) bool {
if enabled {
for i, _ := range b {
if !CheckEncoding(opcode, b[i]) {
return false
}
for i, _ := range b {
if !CheckEncoding(enabled, opcode, b[i]) {
return false
}
}
return true
Expand Down Expand Up @@ -73,10 +69,7 @@ func (b Buffers) WriteTo(w io.Writer) (int64, error) {
type Bytes []byte

func (b Bytes) CheckEncoding(enabled bool, opcode uint8) bool {
if enabled {
return CheckEncoding(opcode, b)
}
return true
return CheckEncoding(enabled, opcode, b)
}

func (b Bytes) Len() int {
Expand Down
6 changes: 3 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ func (c *Conn) dispatch(msg *Message) error {
// Emit onmessage event
func (c *Conn) emitMessage(msg *Message) (err error) {
if msg.compressed {
msg.Data, err = c.deflater.Decompress(msg.Data, c.getDpsDict())
msg.Data, err = c.deflater.Decompress(msg.Data, c.dpsWindow.dict)
if err != nil {
return internal.NewError(internal.CloseInternalServerErr, err)
return internal.NewError(internal.CloseInternalErr, err)
}
_, _ = c.dpsWindow.Write(msg.Bytes())
}
if !c.isTextValid(msg.Opcode, msg.Bytes()) {
if !internal.CheckEncoding(c.config.CheckUtf8Enabled, uint8(msg.Opcode), msg.Bytes()) {
return internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding)
}
if c.config.ParallelEnabled {
Expand Down
4 changes: 4 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var (
// Text message encoding error (must be utf8)
ErrTextEncoding = errors.New("invalid text encoding")

// ErrMessageTooLarge 消息体积过大
// message is too large
ErrMessageTooLarge = errors.New("message too large")

// ErrConnClosed 连接已关闭
// Connection closed
ErrConnClosed = net.ErrClosed
Expand Down
Loading
Loading