Skip to content

Commit

Permalink
Fix: websocket operation channel
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored and m-kus committed Dec 11, 2020
1 parent d17bb57 commit 55f9fad
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
7 changes: 7 additions & 0 deletions cmd/api/ws/channels/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type OperationsChannel struct {
messages chan Message
stop chan struct{}
wg sync.WaitGroup
hashes map[string]struct{}
}

// NewOperationsChannel -
Expand All @@ -32,6 +33,7 @@ func NewOperationsChannel(address, network string, opts ...ChannelOption) *Opera

messages: make(chan Message, 10),
stop: make(chan struct{}),
hashes: make(map[string]struct{}),
}
}

Expand Down Expand Up @@ -105,6 +107,9 @@ func (c *OperationsChannel) createMessage(data datasources.Data) error {
if op.Destination != c.Address && op.Source != c.Address {
return nil
}
if _, ok := c.hashes[op.Hash]; ok {
return nil
}
operations, err := c.es.GetOperations(
map[string]interface{}{
"hash": op.Hash,
Expand All @@ -120,6 +125,8 @@ func (c *OperationsChannel) createMessage(data datasources.Data) error {
if err != nil {
return err
}

c.hashes[op.Hash] = struct{}{}
c.messages <- Message{
ChannelName: c.GetName(),
Body: response,
Expand Down
12 changes: 5 additions & 7 deletions cmd/api/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/baking-bad/bcdhub/cmd/api/ws/channels"
"github.com/baking-bad/bcdhub/internal/logger"
"github.com/gorilla/websocket"
"github.com/valyala/fastjson"
)

// ClientHandler -
Expand Down Expand Up @@ -115,7 +114,6 @@ func (c *Client) sendOk(text string) error {
}

func (c *Client) receive() {
var p fastjson.Parser
for {
select {
case <-c.stop:
Expand All @@ -135,15 +133,15 @@ func (c *Client) receive() {
continue
}

val, err := p.ParseBytes(data)
if err != nil {
var msg ActionMessage
if err := json.Unmarshal(data, &msg); err != nil {
logger.Error(err)
continue
}
action := string(val.GetStringBytes("action"))
handler, ok := c.handlers[action]

handler, ok := c.handlers[msg.Action]
if !ok {
c.sendError(fmt.Errorf("Unknown handler action: %s", action))
c.sendError(fmt.Errorf("Unknown handler action: %s", msg.Action))
continue
}
if err := handler(c, data); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/api/ws/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func unsubscribeHandler(c *Client, data []byte) error {
}

func pingHandler(c *Client, data []byte) error {
return c.sendMessage(map[string]interface{}{
"action": "pong",
return c.sendMessage(ActionMessage{
Action: "pong",
})
}

Expand Down
9 changes: 9 additions & 0 deletions cmd/api/ws/messages.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package ws

import jsoniter "github.com/json-iterator/go"

var json = jsoniter.ConfigCompatibleWithStandardLibrary

// Statuses
const (
ErrorStatus = "error"
Expand All @@ -11,3 +15,8 @@ type StatusMessage struct {
Status string `json:"status"`
Text string `json:"text"`
}

// ActionMessage -
type ActionMessage struct {
Action string `json:"action"`
}

0 comments on commit 55f9fad

Please sign in to comment.