Skip to content

Commit

Permalink
add: transport ws
Browse files Browse the repository at this point in the history
  • Loading branch information
moocss committed Oct 1, 2023
1 parent 766850d commit f5aec8a
Show file tree
Hide file tree
Showing 14 changed files with 1,583 additions and 0 deletions.
18 changes: 18 additions & 0 deletions plugins/transport/websocket/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Websocket

## 什么是WebSocket?

WebSocket 协议主要为了解决基于 HTTP/1.x 的 Web 应用无法实现服务端向客户端主动推送的问题, 为了兼容现有的设施, WebSocket 协议使用与 HTTP 协议相同的端口, 并使用 HTTP Upgrade 机制来进行 WebSocket 握手, 当握手完成之后, 通信双方便可以按照 WebSocket 协议的方式进行交互

WebSocket 使用 TCP 作为传输层协议, 与 HTTP 类似, WebSocket 也支持在 TCP 上层引入 TLS 层, 以建立加密数据传输通道, 即 WebSocket over TLS, WebSocket 的 URI 与 HTTP URI 的结构类似, 对于使用 80 端口的 WebSocket over TCP, 其 URI 的一般形式为 `ws://host:port/path/query` 对于使用 443 端口的 WebSocket over TLS, 其 URI 的一般形式为 `wss://host:port/path/query`

在 WebSocket 协议中, 帧 (frame) 是通信双方数据传输的基本单元, 与其它网络协议相同, frame 由 Header 和 Payload 两部分构成, frame 有多种类型, frame 的类型由其头部的 Opcode 字段 (将在下面讨论) 来指示, WebSocket 的 frame 可以分为两类, 一类是用于传输控制信息的 frame (如通知对方关闭 WebSocket 连接), 一类是用于传输应用数据的 frame, 使用 WebSocket 协议通信的双方都需要首先进行握手, 只有当握手成功之后才开始使用 frame 传输数据

## 参考资料

* [RFC 6455 - The WebSocket Protocol](https://tools.ietf.org/html/rfc6455)
* [wikipedia - WebSocket](https://en.wikipedia.org/wiki/WebSocket)
* [HTML5 WebSocket](https://www.runoob.com/html/html5-websocket.html)
* [MDN - WebSocket](https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket)
* [WebSocket 协议解析 [RFC 6455]](https://sunyunqiang.com/blog/websocket_protocol_rfc6455/)
* [WebSocket 教程](https://www.ruanyifeng.com/blog/2017/05/websocket.html)
313 changes: 313 additions & 0 deletions plugins/transport/websocket/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package websocket

import (
"encoding/json"
"errors"
"net/url"
"time"

"github.com/go-kratos/kratos/v2/encoding"

ws "github.com/gorilla/websocket"

"github.com/tx7do/kratos-transport/broker"
)

type ClientMessageHandler func(MessagePayload) error

type ClientHandlerData struct {
Handler ClientMessageHandler
Binder Binder
}
type ClientMessageHandlerMap map[MessageType]*ClientHandlerData

type Client struct {
conn *ws.Conn

url string
endpoint *url.URL

codec encoding.Codec
messageHandlers ClientMessageHandlerMap

timeout time.Duration

payloadType PayloadType
}

func NewClient(opts ...ClientOption) *Client {
cli := &Client{
url: "",
timeout: 1 * time.Second,
codec: encoding.GetCodec("json"),
messageHandlers: make(ClientMessageHandlerMap),
payloadType: PayloadTypeBinary,
}

cli.init(opts...)

return cli
}

func (c *Client) init(opts ...ClientOption) {
for _, o := range opts {
o(c)
}

c.endpoint, _ = url.Parse(c.url)
}

func (c *Client) Connect() error {
if c.endpoint == nil {
return errors.New("endpoint is nil")
}

LogInfof("connecting to %s", c.endpoint.String())

conn, resp, err := ws.DefaultDialer.Dial(c.endpoint.String(), nil)
if err != nil {
LogErrorf("%s [%v]", err.Error(), resp)
return err
}
c.conn = conn

go c.run()

return nil
}

func (c *Client) Disconnect() {
if c.conn != nil {
if err := c.conn.Close(); err != nil {
LogErrorf("disconnect error: %s", err.Error())
}
c.conn = nil
}
}

func (c *Client) RegisterMessageHandler(messageType MessageType, handler ClientMessageHandler, binder Binder) {
if _, ok := c.messageHandlers[messageType]; ok {
return
}

c.messageHandlers[messageType] = &ClientHandlerData{handler, binder}
}

func RegisterClientMessageHandler[T any](cli *Client, messageType MessageType, handler func(*T) error) {
cli.RegisterMessageHandler(messageType,
func(payload MessagePayload) error {
switch t := payload.(type) {
case *T:
return handler(t)
default:
LogError("invalid payload struct type:", t)
return errors.New("invalid payload struct type")
}
},
func() Any {
var t T
return &t
},
)
}

func (c *Client) DeregisterMessageHandler(messageType MessageType) {
delete(c.messageHandlers, messageType)
}

func (c *Client) marshalMessage(messageType MessageType, message MessagePayload) ([]byte, error) {
var err error
var buff []byte

switch c.payloadType {
case PayloadTypeBinary:
var msg BinaryMessage
msg.Type = messageType
msg.Body, err = broker.Marshal(c.codec, message)
if err != nil {
return nil, err
}
buff, err = msg.Marshal()
if err != nil {
return nil, err
}
break

case PayloadTypeText:
var buf []byte
var msg TextMessage
msg.Type = messageType
buf, err = broker.Marshal(c.codec, message)
msg.Body = string(buf)
if err != nil {
return nil, err
}
buff, err = json.Marshal(msg)
if err != nil {
return nil, err
}
break
}

//LogInfo("marshalMessage:", string(buff))

return buff, nil
}

func (c *Client) SendMessage(messageType MessageType, message interface{}) error {
buff, err := c.marshalMessage(messageType, message)
if err != nil {
LogError("marshal message exception:", err)
return err
}

switch c.payloadType {
case PayloadTypeBinary:
if err = c.sendBinaryMessage(buff); err != nil {
return err
}
break

case PayloadTypeText:
if err = c.sendTextMessage(string(buff)); err != nil {
return err
}
break
}

return nil
}

func (c *Client) sendPingMessage(message string) error {
return c.conn.WriteMessage(ws.PingMessage, []byte(message))
}

func (c *Client) sendPongMessage(message string) error {
return c.conn.WriteMessage(ws.PongMessage, []byte(message))
}

func (c *Client) sendTextMessage(message string) error {
return c.conn.WriteMessage(ws.TextMessage, []byte(message))
}

func (c *Client) sendBinaryMessage(message []byte) error {
return c.conn.WriteMessage(ws.BinaryMessage, message)
}

func (c *Client) run() {
defer c.Disconnect()

for {
messageType, data, err := c.conn.ReadMessage()
if err != nil {
if ws.IsUnexpectedCloseError(err, ws.CloseNormalClosure, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
LogErrorf("read message error: %v", err)
}
return
}

switch messageType {
case ws.CloseMessage:
return

case ws.BinaryMessage:
_ = c.messageHandler(data)
break

case ws.TextMessage:
_ = c.messageHandler(data)
break

case ws.PingMessage:
if err := c.sendPongMessage(""); err != nil {
LogError("write pong message error: ", err)
return
}
break

case ws.PongMessage:
break
}

}
}

func (c *Client) unmarshalMessage(buf []byte) (*ClientHandlerData, MessagePayload, error) {
var handler *ClientHandlerData
var payload MessagePayload

switch c.payloadType {
case PayloadTypeBinary:
var msg BinaryMessage
if err := msg.Unmarshal(buf); err != nil {
LogErrorf("decode message exception: %s", err)
return nil, nil, err
}

var ok bool
handler, ok = c.messageHandlers[msg.Type]
if !ok {
LogError("message handler not found:", msg.Type)
return nil, nil, errors.New("message handler not found")
}

if handler.Binder != nil {
payload = handler.Binder()
} else {
payload = msg.Body
}

if err := broker.Unmarshal(c.codec, msg.Body, &payload); err != nil {
LogErrorf("unmarshal message exception: %s", err)
return nil, nil, err
}
//LogDebug(string(msg.Body))

case PayloadTypeText:
var msg TextMessage
if err := msg.Unmarshal(buf); err != nil {
LogErrorf("decode message exception: %s", err)
return nil, nil, err
}

var ok bool
handler, ok = c.messageHandlers[msg.Type]
if !ok {
LogError("message handler not found:", msg.Type)
return nil, nil, errors.New("message handler not found")
}

if handler.Binder != nil {
payload = handler.Binder()
} else {
payload = msg.Body
}

if err := broker.Unmarshal(c.codec, []byte(msg.Body), &payload); err != nil {
LogErrorf("unmarshal message exception: %s", err)
return nil, nil, err
}
//LogDebug(string(msg.Body))
}

return handler, payload, nil
}

func (c *Client) messageHandler(buf []byte) error {
var err error
var handler *ClientHandlerData
var payload MessagePayload

if handler, payload, err = c.unmarshalMessage(buf); err != nil {
LogErrorf("unmarshal message failed: %s", err)
return err
}
//LogDebug(payload)

if err = handler.Handler(payload); err != nil {
LogErrorf("message handler exception: %s", err)
return err
}

return nil
}
Loading

0 comments on commit f5aec8a

Please sign in to comment.