Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Aug 21, 2024
1 parent 6fddae5 commit 40a1499
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 199 deletions.
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ ok github.com/lxzan/gws 17.231s
- [Introduction](#introduction)
- [Why GWS](#why-gws)
- [Benchmark](#benchmark)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [Index](#index)
- [Feature](#feature)
- [Attention](#attention)
Expand All @@ -87,13 +87,14 @@ ok github.com/lxzan/gws 17.231s
- [Quick Start](#quick-start)
- [Best Practice](#best-practice)
- [More Examples](#more-examples)
- [KCP](#kcp)
- [Proxy](#proxy)
- [Broadcast](#broadcast)
- [WriteWithTimeout](#writewithtimeout)
- [Pub / Sub](#pub--sub)
- [KCP](#kcp)
- [Proxy](#proxy)
- [Broadcast](#broadcast)
- [WriteWithTimeout](#writewithtimeout)
- [Pub / Sub](#pub--sub)
- [Autobahn Test](#autobahn-test)
- [Communication](#communication)
- [Buy me a coffee](#buy-me-a-coffee)
- [Acknowledgments](#acknowledgments)

### Feature
Expand Down Expand Up @@ -388,6 +389,10 @@ docker run -it --rm \
<img src="assets/qq.jpg" alt="QQ" width="300" height="300" style="display: inline-block"/>
</div>

### Buy me a coffee

<img src="assets/alipay.jpg" alt="WeChat" width="300" style="display: inline-block;"/>

### Acknowledgments

The following project had particular influence on gws's design.
Expand Down
19 changes: 12 additions & 7 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ ok github.com/lxzan/gws 17.231s
- [介绍](#介绍)
- [为什么选择 GWS](#为什么选择-gws)
- [基准测试](#基准测试)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [IOPS (Echo Server)](#iops-echo-server)
- [GoBench](#gobench)
- [Index](#index)
- [特性](#特性)
- [注意](#注意)
Expand All @@ -77,13 +77,14 @@ ok github.com/lxzan/gws 17.231s
- [快速上手](#快速上手)
- [最佳实践](#最佳实践)
- [更多用例](#更多用例)
- [KCP](#kcp)
- [代理](#代理)
- [广播](#广播)
- [写入超时](#写入超时)
- [发布/订阅](#发布订阅)
- [KCP](#kcp)
- [代理](#代理)
- [广播](#广播)
- [写入超时](#写入超时)
- [发布/订阅](#发布订阅)
- [Autobahn 测试](#autobahn-测试)
- [交流](#交流)
- [赞赏](#赞赏)
- [致谢](#致谢)

### 特性
Expand Down Expand Up @@ -375,6 +376,10 @@ docker run -it --rm \
<img src="assets/qq.jpg" alt="QQ" width="300" height="300" style="display: inline-block"/>
</div>

### 赞赏

<img src="assets/alipay.jpg" alt="WeChat" width="300" style="display: inline-block;"/>

### 致谢

- [crossbario/autobahn-testsuite](https://github.com/crossbario/autobahn-testsuite)
Expand Down
Binary file added assets/alipay.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 8 additions & 0 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return nil
}

func compressTo(cpsWriter *flate.Writer, r io.WriterTo, w io.Writer, dict []byte) error {
cpsWriter.ResetDict(w, dict)
if _, err := r.WriteTo(cpsWriter); err != nil {
return err
}
return cpsWriter.Flush()
}

// 滑动窗口
// Sliding window
type slideWindow struct {
Expand Down
111 changes: 20 additions & 91 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *Conn) ReadLoop() {
// Infinite loop to read messages, if an error occurs, trigger the error event and exit the loop
for {
if err := c.readMessage(); err != nil {
c.emitError(err)
c.emitError(true, err)
break
}
}
Expand All @@ -125,106 +125,35 @@ func (c *Conn) ReadLoop() {
}
}

// 获取压缩字典
// Get compressed dictionary
func (c *Conn) getCpsDict(isBroadcast bool) []byte {
// 广播模式必须保证每一帧都是相同的内容, 所以不使用上下文接管优化压缩率
// In broadcast mode, each frame must be the same content, so context takeover is not used to optimize compression ratio
if isBroadcast {
return nil
}

// 如果是服务器并且服务器上下文接管启用,返回压缩字典
// If it is a server and server context takeover is enabled, return the compression dictionary
if c.isServer && c.pd.ServerContextTakeover {
return c.cpsWindow.dict
}

// 如果是客户端并且客户端上下文接管启用,返回压缩字典
// If client-side and client context takeover is enabled, return the compression dictionary
if !c.isServer && c.pd.ClientContextTakeover {
return c.cpsWindow.dict
}

return nil
}

// 获取解压字典
// Get decompression dictionary
func (c *Conn) getDpsDict() []byte {
// 如果是服务器并且客户端上下文接管启用,返回解压字典
// If it is a server and client context takeover is enabled, return the decompression dictionary
if c.isServer && c.pd.ClientContextTakeover {
return c.dpsWindow.dict
}

// 如果是客户端并且服务器上下文接管启用,返回解压字典
// If it is a client and server context takeover is enabled, return the decompressed dictionary
if !c.isServer && c.pd.ServerContextTakeover {
return c.dpsWindow.dict
}

return nil
}

// UTF8编码检查
// UTF8 encoding check
func (c *Conn) isTextValid(opcode Opcode, payload []byte) bool {
if c.config.CheckUtf8Enabled {
return internal.CheckEncoding(uint8(opcode), payload)
}
return true
}

// 检查连接是否已关闭
// Checks if the connection is closed
func (c *Conn) isClosed() bool {
return atomic.LoadUint32(&c.closed) == 1
}

// 关闭连接并存储错误信息
// Closes the connection and stores the error information
func (c *Conn) close(reason []byte, err error) {
c.ev.Store(err)
_ = c.doWrite(OpcodeCloseConnection, internal.Bytes(reason))
_ = c.conn.Close()
}

// 处理错误事件
// Handle the error event
func (c *Conn) emitError(err error) {
func (c *Conn) emitError(reading bool, err error) {
if err == nil {
return
}

// 使用原子操作检查并设置连接的关闭状态
// Use atomic operation to check and set the closed state of the connection
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
var responseCode = internal.CloseNormalClosure
var responseErr error = internal.CloseNormalClosure

// 根据错误类型设置响应代码和响应错误
// Set response code and response error based on the error type
switch v := err.(type) {
case internal.StatusCode:
responseCode = v
case *internal.Error:
responseCode = v.Code
responseErr = v.Err
default:
responseErr = err
}

var content = responseCode.Bytes()
content = append(content, err.Error()...)

// 如果内容长度超过阈值,截断内容
// If the content length exceeds the threshold, truncate the content
if len(content) > internal.ThresholdV1 {
content = content[:internal.ThresholdV1]
// 待发送的错误码和错误原因
var sendCode, sendErr = internal.CloseGoingAway, error(internal.CloseGoingAway)
if reading {
switch v := err.(type) {
case internal.StatusCode:
sendCode, sendErr = v, v
case *internal.Error:
sendCode, sendErr, err = v.Code, v.Err, v.Err
default:
sendCode, sendErr = internal.CloseNormalClosure, err
}
}

c.close(content, responseErr)
var reason = append(sendCode.Bytes(), sendErr.Error()...)
_ = c.writeClose(err, reason)
}
}

Expand Down Expand Up @@ -257,12 +186,12 @@ func (c *Conn) emitClose(buf *bytes.Buffer) error {
responseCode = internal.StatusCode(realCode)
}
}
if !c.isTextValid(OpcodeCloseConnection, buf.Bytes()) {
if !internal.CheckEncoding(c.config.CheckUtf8Enabled, uint8(OpcodeCloseConnection), buf.Bytes()) {
responseCode = internal.CloseUnsupportedData
}
}
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
c.close(responseCode.Bytes(), &CloseError{Code: realCode, Reason: buf.Bytes()})
_ = c.writeClose(&CloseError{Code: realCode, Reason: buf.Bytes()}, responseCode.Bytes())
}
return internal.CloseNormalClosure
}
Expand All @@ -271,23 +200,23 @@ func (c *Conn) emitClose(buf *bytes.Buffer) error {
// Sets the deadline for the connection
func (c *Conn) SetDeadline(t time.Time) error {
err := c.conn.SetDeadline(t)
c.emitError(err)
c.emitError(false, err)
return err
}

// SetReadDeadline 设置读取操作的截止时间
// Sets the deadline for read operations
func (c *Conn) SetReadDeadline(t time.Time) error {
err := c.conn.SetReadDeadline(t)
c.emitError(err)
c.emitError(false, err)
return err
}

// SetWriteDeadline 设置写入操作的截止时间
// Sets the deadline for write operations
func (c *Conn) SetWriteDeadline(t time.Time) error {
err := c.conn.SetWriteDeadline(t)
c.emitError(err)
c.emitError(false, err)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@ func TestConn_EmitError(t *testing.T) {
server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption)
go client.ReadLoop()
err := errors.New(string(internal.AlphabetNumeric.Generate(500)))
server.emitError(err)
server.emitError(false, err)
wg.Wait()
}
34 changes: 17 additions & 17 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package internal
// closeErrorMap 将状态码映射到错误信息
// map status codes to error messages
var closeErrorMap = map[StatusCode]string{
0: "empty code",
CloseNormalClosure: "close normal",
CloseGoingAway: "client going away",
CloseProtocolError: "protocol error",
CloseUnsupported: "unsupported data",
CloseNoStatusReceived: "no status",
CloseAbnormalClosure: "abnormal closure",
CloseUnsupportedData: "invalid payload data",
ClosePolicyViolation: "policy violation",
CloseMessageTooLarge: "message too large",
CloseMissingExtension: "mandatory extension missing",
CloseInternalServerErr: "internal server error",
CloseServiceRestart: "server restarting",
CloseTryAgainLater: "try again later",
CloseTLSHandshake: "TLS handshake error",
0: "empty code",
CloseNormalClosure: "close normal",
CloseGoingAway: "client going away",
CloseProtocolError: "protocol error",
CloseUnsupported: "unsupported data",
CloseNoStatusReceived: "no status",
CloseAbnormalClosure: "abnormal closure",
CloseUnsupportedData: "invalid payload data",
ClosePolicyViolation: "policy violation",
CloseMessageTooLarge: "message too large",
CloseMissingExtension: "mandatory extension missing",
CloseInternalErr: "internal error",
CloseServiceRestart: "server restarting",
CloseTryAgainLater: "try again later",
CloseTLSHandshake: "TLS handshake error",
}

// StatusCode WebSocket错误码
Expand Down Expand Up @@ -55,8 +55,8 @@ const (
// CloseMissingExtension 客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
CloseMissingExtension StatusCode = 1010

// CloseInternalServerErr 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
CloseInternalServerErr StatusCode = 1011
// CloseInternalErr 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
CloseInternalErr StatusCode = 1011

// CloseServiceRestart 服务器由于重启而断开连接. [Ref]
CloseServiceRestart StatusCode = 1012
Expand Down
21 changes: 7 additions & 14 deletions internal/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ func WriteN(writer io.Writer, content []byte) error {

// CheckEncoding 检查 payload 的编码是否有效
// checks if the encoding of the payload is valid
func CheckEncoding(opcode uint8, payload []byte) bool {
switch opcode {
case 1, 8:
func CheckEncoding(enabled bool, opcode uint8, payload []byte) bool {
if enabled && (opcode == 1 || opcode == 8) {
return utf8.Valid(payload)
default:
return true
}
return true
}

type Payload interface {
Expand All @@ -39,11 +37,9 @@ type Payload interface {
type Buffers [][]byte

func (b Buffers) CheckEncoding(enabled bool, opcode uint8) bool {
if enabled {
for i, _ := range b {
if !CheckEncoding(opcode, b[i]) {
return false
}
for i, _ := range b {
if !CheckEncoding(enabled, opcode, b[i]) {
return false
}
}
return true
Expand Down Expand Up @@ -73,10 +69,7 @@ func (b Buffers) WriteTo(w io.Writer) (int64, error) {
type Bytes []byte

func (b Bytes) CheckEncoding(enabled bool, opcode uint8) bool {
if enabled {
return CheckEncoding(opcode, b)
}
return true
return CheckEncoding(enabled, opcode, b)
}

func (b Bytes) Len() int {
Expand Down
6 changes: 3 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ func (c *Conn) dispatch(msg *Message) error {
// Emit onmessage event
func (c *Conn) emitMessage(msg *Message) (err error) {
if msg.compressed {
msg.Data, err = c.deflater.Decompress(msg.Data, c.getDpsDict())
msg.Data, err = c.deflater.Decompress(msg.Data, c.dpsWindow.dict)
if err != nil {
return internal.NewError(internal.CloseInternalServerErr, err)
return internal.NewError(internal.CloseInternalErr, err)
}
_, _ = c.dpsWindow.Write(msg.Bytes())
}
if !c.isTextValid(msg.Opcode, msg.Bytes()) {
if !internal.CheckEncoding(c.config.CheckUtf8Enabled, uint8(msg.Opcode), msg.Bytes()) {
return internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding)
}
if c.config.ParallelEnabled {
Expand Down
4 changes: 4 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var (
// Text message encoding error (must be utf8)
ErrTextEncoding = errors.New("invalid text encoding")

// ErrMessageTooLarge 消息体积过大
// message size is too large
ErrMessageTooLarge = errors.New("message size is too large")

// ErrConnClosed 连接已关闭
// Connection closed
ErrConnClosed = net.ErrClosed
Expand Down
Loading

0 comments on commit 40a1499

Please sign in to comment.