Skip to content

Commit

Permalink
Update node bnb-chain#1
Browse files Browse the repository at this point in the history
  • Loading branch information
ntminh611 committed Mar 15, 2023
1 parent 49c7b60 commit 51b568f
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
35 changes: 35 additions & 0 deletions core/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package core

type Broadcast struct {
allClient map[*Client]bool
broadcastMessage chan []byte
registerClient chan *Client
unregisterClient chan *Client
}

func NewBroadcast() *Broadcast {
return &Broadcast{
allClient: make(map[*Client]bool),
broadcastMessage: make(chan []byte),
registerClient: make(chan *Client),
unregisterClient: make(chan *Client),
}
}
func (b *Broadcast) Run() {
for {
select {
case newClient := <-b.registerClient:
b.allClient[newClient] = true
case clientData := <-b.unregisterClient:
_, stateClient := b.allClient[clientData]
if stateClient {
delete(b.allClient, clientData)
close(clientData.sendMessage)
}
case messageData := <-b.broadcastMessage:
for clientData := range b.allClient {
clientData.sendMessage <- messageData
}
}
}
}
113 changes: 113 additions & 0 deletions core/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package core

import (
"fmt"
"github.com/gorilla/websocket"
"log"
"net/http"
"time"
)

const (
pongWait = 10 * time.Second
pingPeriod = 5 * time.Second
writeDeadline = 10 * time.Second
)

var upgraderConn = &websocket.Upgrader{}

type Client struct {
clientBroadcast *Broadcast
websocketConn *websocket.Conn
sendMessage chan []byte
}

func (c *Client) readPump() {
defer func() {
c.clientBroadcast.unregisterClient <- c
c.websocketConn.Close()
}()

err := c.websocketConn.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
return
}

c.websocketConn.SetPongHandler(func(string) error {
fmt.Println("Received Pong")
err = c.websocketConn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})

for {
_, _, err = c.websocketConn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Println("Error", err)
}
log.Println("Error", err)
break
}
}
}

func (c *Client) writePump() {
newTicker := time.NewTicker(pingPeriod)
defer func() {
newTicker.Stop()
c.websocketConn.Close()
}()
for {
select {
case messageData, stateData := <-c.sendMessage:
err := c.websocketConn.SetWriteDeadline(time.Now().Add(pingPeriod))
if err != nil {
return
}
if !stateData {
log.Println("Close the channel")
c.websocketConn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

c.websocketConn.WriteMessage(websocket.BinaryMessage, messageData)

err = c.websocketConn.SetWriteDeadline(time.Time{})
if err != nil {
return
}
case <-newTicker.C:
log.Println("Send Ping")
err := c.websocketConn.SetWriteDeadline(time.Now().Add(writeDeadline))
if err != nil {
return
}

c.websocketConn.WriteMessage(websocket.PingMessage, nil)
err = c.websocketConn.SetWriteDeadline(time.Time{})
if err != nil {
return
}
}
}
}

func serveWs(broadcast *Broadcast, w http.ResponseWriter, r *http.Request) {
websocketConn, err := upgraderConn.Upgrade(w, r, nil)
if err != nil {
return
}

newClient := &Client{
clientBroadcast: broadcast,
websocketConn: websocketConn,
sendMessage: make(chan []byte, 256),
}
newClient.clientBroadcast.registerClient <- newClient
go newClient.writePump()
go newClient.readPump()
}

func messageHandler(broadcast *Broadcast, b []byte) {
broadcast.broadcastMessage <- b
}

0 comments on commit 51b568f

Please sign in to comment.