From 65c657a9f065a695afd101d38469926e3c6a9e5e Mon Sep 17 00:00:00 2001 From: Roman Serikov Date: Mon, 15 Mar 2021 18:52:59 +0300 Subject: [PATCH] Removed ws. Added /v1/head endpoint for stats --- cmd/api/handlers/head.go | 76 ++++++++++ cmd/api/handlers/responses.go | 14 ++ cmd/api/main.go | 7 +- cmd/api/ws/channels/channel.go | 39 ----- cmd/api/ws/channels/operations.go | 134 ----------------- cmd/api/ws/channels/option.go | 39 ----- cmd/api/ws/channels/stats.go | 177 ---------------------- cmd/api/ws/client.go | 171 --------------------- cmd/api/ws/datasources/data_source.go | 49 ------ cmd/api/ws/datasources/rabbit.go | 111 -------------- cmd/api/ws/handler.go | 30 ---- cmd/api/ws/hub.go | 209 -------------------------- cmd/api/ws/messages.go | 22 --- cmd/api/ws/options.go | 37 ----- 14 files changed, 91 insertions(+), 1024 deletions(-) create mode 100644 cmd/api/handlers/head.go delete mode 100644 cmd/api/ws/channels/channel.go delete mode 100644 cmd/api/ws/channels/operations.go delete mode 100644 cmd/api/ws/channels/option.go delete mode 100644 cmd/api/ws/channels/stats.go delete mode 100644 cmd/api/ws/client.go delete mode 100644 cmd/api/ws/datasources/data_source.go delete mode 100644 cmd/api/ws/datasources/rabbit.go delete mode 100644 cmd/api/ws/handler.go delete mode 100644 cmd/api/ws/hub.go delete mode 100644 cmd/api/ws/messages.go delete mode 100644 cmd/api/ws/options.go diff --git a/cmd/api/handlers/head.go b/cmd/api/handlers/head.go new file mode 100644 index 000000000..531b52152 --- /dev/null +++ b/cmd/api/handlers/head.go @@ -0,0 +1,76 @@ +package handlers + +import ( + "net/http" + "time" + + "github.com/gin-gonic/gin" +) + +// GetHead godoc +// @Summary Show indexer head +// @Description Get indexer head for each network +// @Tags head +// @ID get-indexer-head +// @Accept json +// @Produce json +// @Success 200 {array} HeadResponse +// @Failure 500 {object} Error +// @Router /v1/head [get] +func (ctx *Context) GetHead(c *gin.Context) { + item, err := ctx.Cache.Fetch("head", time.Second*30, ctx.getHead) + if ctx.handleError(c, err, 0) { + return + } + + c.JSON(http.StatusOK, item.Value()) +} + +func (ctx *Context) getHead() (interface{}, error) { + blocks, err := ctx.Blocks.LastByNetworks() + if err != nil { + return nil, err + } + + var network string + if len(blocks) == 1 { + network = blocks[0].Network + } + callCounts, err := ctx.Storage.GetCallsCountByNetwork(network) + if err != nil { + return nil, err + } + contractStats, err := ctx.Storage.GetContractStatsByNetwork(network) + if err != nil { + return nil, err + } + faCount, err := ctx.Storage.GetFACountByNetwork(network) + if err != nil { + return nil, err + } + body := make([]HeadResponse, len(blocks)) + for i := range blocks { + body[i] = HeadResponse{ + Network: blocks[i].Network, + Level: blocks[i].Level, + Timestamp: blocks[i].Timestamp, + Protocol: blocks[i].Protocol, + } + calls, ok := callCounts[blocks[i].Network] + if ok { + body[i].ContractCalls = calls + } + fa, ok := faCount[blocks[i].Network] + if ok { + body[i].FACount = fa + } + stats, ok := contractStats[blocks[i].Network] + if ok { + body[i].Total = stats.Total + body[i].TotalBalance = stats.Balance + body[i].UniqueContracts = stats.SameCount + } + } + + return body, nil +} diff --git a/cmd/api/handlers/responses.go b/cmd/api/handlers/responses.go index 130e1b4a3..a26d08bca 100644 --- a/cmd/api/handlers/responses.go +++ b/cmd/api/handlers/responses.go @@ -746,3 +746,17 @@ type TZIPResponse struct { tzip.TZIP16 tzip.TZIP20 } + +// HeadResponse - +type HeadResponse struct { + Network string `json:"network"` + Level int64 `json:"level"` + Timestamp time.Time `json:"time"` + Protocol string `json:"protocol"` + Total int64 `json:"total"` + ContractCalls int64 `json:"contract_calls"` + UniqueContracts int64 `json:"unique_contracts"` + TotalBalance int64 `json:"total_balance"` + TotalWithdrawn int64 `json:"total_withdrawn"` + FACount int64 `json:"fa_count"` +} diff --git a/cmd/api/main.go b/cmd/api/main.go index 47677eb21..8a181c108 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -9,7 +9,6 @@ import ( "github.com/baking-bad/bcdhub/cmd/api/handlers" "github.com/baking-bad/bcdhub/cmd/api/seed" "github.com/baking-bad/bcdhub/cmd/api/validations" - "github.com/baking-bad/bcdhub/cmd/api/ws" "github.com/baking-bad/bcdhub/internal/config" "github.com/baking-bad/bcdhub/internal/helpers" "github.com/baking-bad/bcdhub/internal/logger" @@ -21,7 +20,6 @@ import ( type app struct { Router *gin.Engine - Hub *ws.Hub Context *handlers.Context } @@ -53,7 +51,6 @@ func newApp() *app { } api := &app{ - Hub: ws.DefaultHub(ctx), Context: ctx, } @@ -92,8 +89,8 @@ func (api *app) makeRouter() { v1 := r.Group("v1") { v1.GET("swagger.json", api.Context.GetSwaggerDoc) - v1.GET("ws", func(c *gin.Context) { ws.Handler(c, api.Hub) }) + v1.GET("head", api.Context.GetHead) v1.GET("opg/:hash", api.Context.GetOperation) v1.GET("operation/:id/error_location", api.Context.GetOperationErrorLocation) v1.GET("pick_random", api.Context.GetRandomContract) @@ -258,11 +255,9 @@ func (api *app) makeRouter() { func (api *app) Close() { api.Context.Close() - api.Hub.Stop() } func (api *app) Run() { - api.Hub.Run() if err := api.Router.Run(api.Context.Config.API.Bind); err != nil { logger.Error(err) helpers.CatchErrorSentry(err) diff --git a/cmd/api/ws/channels/channel.go b/cmd/api/ws/channels/channel.go deleted file mode 100644 index f9bac1e66..000000000 --- a/cmd/api/ws/channels/channel.go +++ /dev/null @@ -1,39 +0,0 @@ -package channels - -import ( - "github.com/baking-bad/bcdhub/cmd/api/handlers" - "github.com/baking-bad/bcdhub/cmd/api/ws/datasources" -) - -// Channel - -type Channel interface { - GetName() string - Run() - Listen() <-chan Message - Stop() - Init() error -} - -// DefaultChannel - -type DefaultChannel struct { - sources []datasources.DataSource - - ctx *handlers.Context -} - -// NewDefaultChannel - -func NewDefaultChannel(opts ...ChannelOption) *DefaultChannel { - c := &DefaultChannel{ - sources: make([]datasources.DataSource, 0), - } - for _, opt := range opts { - opt(c) - } - return c -} - -// Message - -type Message struct { - ChannelName string `json:"channel_name"` - Body interface{} `json:"body"` -} diff --git a/cmd/api/ws/channels/operations.go b/cmd/api/ws/channels/operations.go deleted file mode 100644 index fe01a190e..000000000 --- a/cmd/api/ws/channels/operations.go +++ /dev/null @@ -1,134 +0,0 @@ -package channels - -import ( - "fmt" - "sync" - - "github.com/baking-bad/bcdhub/cmd/api/ws/datasources" - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/baking-bad/bcdhub/internal/models/operation" - "github.com/baking-bad/bcdhub/internal/mq" - "github.com/pkg/errors" -) - -// OperationsChannel - -type OperationsChannel struct { - *DefaultChannel - Address string - Network string - - messages chan Message - stop chan struct{} - wg sync.WaitGroup - hashes map[string]struct{} -} - -// NewOperationsChannel - -func NewOperationsChannel(address, network string, opts ...ChannelOption) *OperationsChannel { - return &OperationsChannel{ - DefaultChannel: NewDefaultChannel(opts...), - Address: address, - Network: network, - - messages: make(chan Message, 10), - stop: make(chan struct{}), - hashes: make(map[string]struct{}), - } -} - -// GetName - -func (c *OperationsChannel) GetName() string { - return fmt.Sprintf("operations_%s_%s", c.Network, c.Address) -} - -// Run - -func (c *OperationsChannel) Run() { - if len(c.sources) == 0 { - logger.Errorf("[%s] Empty source list", c.GetName()) - return - } - - for i := range c.sources { - c.wg.Add(1) - go c.listen(c.sources[i]) - } -} - -// Listen - -func (c *OperationsChannel) Listen() <-chan Message { - return c.messages -} - -// Stop - -func (c *OperationsChannel) Stop() { - close(c.stop) - c.wg.Wait() - close(c.messages) -} - -// Init - -func (c *OperationsChannel) Init() error { - c.messages <- Message{ - ChannelName: c.GetName(), - Body: "ok", - } - return nil -} - -func (c *OperationsChannel) listen(source datasources.DataSource) { - defer c.wg.Done() - - ch := source.Subscribe() - for { - select { - case <-c.stop: - source.Unsubscribe(ch) - return - case data := <-ch: - if data.Type != datasources.RabbitType || data.Kind != mq.QueueOperations { - continue - } - if err := c.createMessage(data); err != nil { - logger.Error(err) - } - } - } -} - -func (c *OperationsChannel) createMessage(data datasources.Data) error { - op := operation.Operation{ID: string(data.Body.([]byte))} - if err := c.ctx.Storage.GetByID(&op); err != nil { - return errors.Errorf("[OperationsChannel.createMessage] Find operation error: %s", err) - } - if op.Network != c.Network { - return nil - } - if op.Destination != c.Address && op.Source != c.Address { - return nil - } - if _, ok := c.hashes[op.Hash]; ok { - return nil - } - operations, err := c.ctx.Operations.Get( - map[string]interface{}{ - "hash": op.Hash, - }, - 0, - true, - ) - if err != nil && !c.ctx.Storage.IsRecordNotFound(err) { - return err - } - - response, err := c.ctx.PrepareOperations(operations, true) - if err != nil { - return err - } - - c.hashes[op.Hash] = struct{}{} - c.messages <- Message{ - ChannelName: c.GetName(), - Body: response, - } - return nil -} diff --git a/cmd/api/ws/channels/option.go b/cmd/api/ws/channels/option.go deleted file mode 100644 index a998cd096..000000000 --- a/cmd/api/ws/channels/option.go +++ /dev/null @@ -1,39 +0,0 @@ -package channels - -import ( - "github.com/baking-bad/bcdhub/cmd/api/handlers" - "github.com/baking-bad/bcdhub/cmd/api/ws/datasources" - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/pkg/errors" -) - -// ChannelOption - -type ChannelOption func(*DefaultChannel) - -// WithSource - -func WithSource(sources []datasources.DataSource, typ string) ChannelOption { - return func(c *DefaultChannel) { - source, err := getSourceByType(sources, typ) - if err != nil { - logger.Error(err) - return - } - c.sources = append(c.sources, source) - } -} - -// WithContext - -func WithContext(ctx *handlers.Context) ChannelOption { - return func(c *DefaultChannel) { - c.ctx = ctx - } -} - -func getSourceByType(sources []datasources.DataSource, typ string) (datasources.DataSource, error) { - for i := range sources { - if sources[i].GetType() == typ { - return sources[i], nil - } - } - return nil, errors.Errorf("unknown source type: %s", typ) -} diff --git a/cmd/api/ws/channels/stats.go b/cmd/api/ws/channels/stats.go deleted file mode 100644 index 7260e5862..000000000 --- a/cmd/api/ws/channels/stats.go +++ /dev/null @@ -1,177 +0,0 @@ -package channels - -import ( - "sync" - "time" - - "github.com/baking-bad/bcdhub/cmd/api/ws/datasources" - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/baking-bad/bcdhub/internal/models/block" - "github.com/baking-bad/bcdhub/internal/mq" - jsoniter "github.com/json-iterator/go" -) - -var json = jsoniter.ConfigCompatibleWithStandardLibrary - -// StatsChannel - -type StatsChannel struct { - *DefaultChannel - - messages chan Message - stop chan struct{} - wg sync.WaitGroup -} - -// StatsBody - -type StatsBody struct { - Network string `json:"network"` - Level int64 `json:"level"` - Timestamp time.Time `json:"time"` - Protocol string `json:"protocol"` - Total int64 `json:"total"` - ContractCalls int64 `json:"contract_calls"` - UniqueContracts int64 `json:"unique_contracts"` - TotalBalance int64 `json:"total_balance"` - TotalWithdrawn int64 `json:"total_withdrawn"` - FACount int64 `json:"fa_count"` -} - -// NewStatsChannel - -func NewStatsChannel(opts ...ChannelOption) *StatsChannel { - return &StatsChannel{ - DefaultChannel: NewDefaultChannel(opts...), - messages: make(chan Message, 10), - stop: make(chan struct{}), - } -} - -// GetName - -func (c *StatsChannel) GetName() string { - return "stats" -} - -// Run - -func (c *StatsChannel) Run() { - if len(c.sources) == 0 { - logger.Errorf("[%s] Empty source list", c.GetName()) - return - } - - for i := range c.sources { - c.wg.Add(1) - go c.listen(c.sources[i]) - } -} - -// Listen - -func (c *StatsChannel) Listen() <-chan Message { - return c.messages -} - -// Stop - -func (c *StatsChannel) Stop() { - close(c.stop) - c.wg.Wait() - close(c.messages) -} - -// Init - -func (c *StatsChannel) Init() error { - return c.initMessage() -} - -func (c *StatsChannel) listen(source datasources.DataSource) { - defer c.wg.Done() - - ch := source.Subscribe() - for { - select { - case <-c.stop: - source.Unsubscribe(ch) - return - case data := <-ch: - if data.Type != datasources.RabbitType || data.Kind != mq.QueueBlocks { - continue - } - if err := c.createMessage(data.Body.([]byte)); err != nil { - logger.Error(err) - } - } - } -} - -func (c *StatsChannel) initMessage() error { - blocks, err := c.ctx.Blocks.LastByNetworks() - if err != nil { - return err - } - stats, err := c.getStats(blocks) - if err != nil { - return err - } - c.messages <- Message{ - ChannelName: c.GetName(), - Body: stats, - } - return nil -} - -func (c *StatsChannel) createMessage(data []byte) error { - var b block.Block - if err := json.Unmarshal(data, &b); err != nil { - return err - } - stats, err := c.getStats([]block.Block{b}) - if err != nil { - return err - } - c.messages <- Message{ - ChannelName: c.GetName(), - Body: stats, - } - return nil -} - -func (c *StatsChannel) getStats(blocks []block.Block) ([]StatsBody, error) { - var network string - if len(blocks) == 1 { - network = blocks[0].Network - } - callCounts, err := c.ctx.Storage.GetCallsCountByNetwork(network) - if err != nil { - return nil, err - } - contractStats, err := c.ctx.Storage.GetContractStatsByNetwork(network) - if err != nil { - return nil, err - } - faCount, err := c.ctx.Storage.GetFACountByNetwork(network) - if err != nil { - return nil, err - } - body := make([]StatsBody, len(blocks)) - for i := range blocks { - body[i] = StatsBody{ - Network: blocks[i].Network, - Level: blocks[i].Level, - Timestamp: blocks[i].Timestamp, - Protocol: blocks[i].Protocol, - } - calls, ok := callCounts[blocks[i].Network] - if ok { - body[i].ContractCalls = calls - } - fa, ok := faCount[blocks[i].Network] - if ok { - body[i].FACount = fa - } - stats, ok := contractStats[blocks[i].Network] - if ok { - body[i].Total = stats.Total - body[i].TotalBalance = stats.Balance - body[i].UniqueContracts = stats.SameCount - } - } - - return body, nil -} diff --git a/cmd/api/ws/client.go b/cmd/api/ws/client.go deleted file mode 100644 index 9c3739101..000000000 --- a/cmd/api/ws/client.go +++ /dev/null @@ -1,171 +0,0 @@ -package ws - -import ( - "fmt" - "math/rand" - "sync" - "time" - - "github.com/baking-bad/bcdhub/cmd/api/ws/channels" - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/gorilla/websocket" -) - -// ClientHandler - -type ClientHandler func(*Client, []byte) error - -// ClientEvent - -type ClientEvent func([]byte) error - -// Client - nolint -type Client struct { - id int - conn *websocket.Conn - - sender chan channels.Message - stop chan struct{} - - subscriptions sync.Map - - handlers map[string]ClientHandler - sendMux sync.Mutex - - onSubscribe ClientEvent //nolint - onUnsubscribe ClientEvent //nolint - - isClosed bool - - hub *Hub -} - -// NewClient - -func NewClient(conn *websocket.Conn) *Client { - return &Client{ - id: rand.Int(), - conn: conn, - - sender: make(chan channels.Message), - stop: make(chan struct{}), - - handlers: make(map[string]ClientHandler), - } -} - -// GetSubscription - -func (c *Client) GetSubscription(name string) (channels.Channel, bool) { - val, ok := c.subscriptions.Load(name) - if !ok { - return nil, ok - } - channel, ok := val.(channels.Channel) - return channel, ok -} - -// Send - -func (c *Client) Send(msg channels.Message) { - if _, ok := c.subscriptions.Load(msg.ChannelName); ok && !c.isClosed { - c.sender <- msg - } -} - -// Run - -func (c *Client) Run() { - go c.send() - go c.receive() -} - -// Close - -func (c *Client) Close() { - c.isClosed = true - close(c.stop) - close(c.sender) - c.conn.Close() - c.hub.RemoveClient(c) -} - -// AddHandler - -func (c *Client) AddHandler(name string, handler ClientHandler) { - c.handlers[name] = handler -} - -func (c *Client) sendMessage(message interface{}) error { - c.sendMux.Lock() - defer c.sendMux.Unlock() - - if err := c.conn.SetWriteDeadline(time.Now().Add(time.Second * 2)); err != nil { - return err - } - return c.conn.WriteJSON(message) -} - -func (c *Client) sendError(err error) { - msg := StatusMessage{ - Status: ErrorStatus, - Text: err.Error(), - } - if err := c.sendMessage(msg); err != nil { - logger.Error(err) - } -} - -func (c *Client) sendOk(text string) error { - msg := StatusMessage{ - Status: OkStatus, - Text: text, - } - return c.sendMessage(msg) -} - -func (c *Client) receive() { - for { - select { - case <-c.stop: - return - default: - if err := c.conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { - logger.Error(err) - continue - } - messageType, data, err := c.conn.ReadMessage() - if err != nil { - c.Close() - continue - } - - if messageType != websocket.TextMessage { - continue - } - - var msg ActionMessage - if err := json.Unmarshal(data, &msg); err != nil { - logger.Error(err) - continue - } - - handler, ok := c.handlers[msg.Action] - if !ok { - c.sendError(fmt.Errorf("Unknown handler action: %s", msg.Action)) - continue - } - if err := handler(c, data); err != nil { - c.sendError(err) - continue - } - } - } -} - -func (c *Client) send() { - for { - select { - case <-c.stop: - return - case msg := <-c.sender: - if msg.ChannelName != "" && msg.Body != nil { - if err := c.sendMessage(msg); err != nil { - c.sendError(err) - } - } - } - } -} diff --git a/cmd/api/ws/datasources/data_source.go b/cmd/api/ws/datasources/data_source.go deleted file mode 100644 index 51b0a44b5..000000000 --- a/cmd/api/ws/datasources/data_source.go +++ /dev/null @@ -1,49 +0,0 @@ -package datasources - -import "sync" - -// Data source types -const ( - RabbitType = "rabbit" -) - -// DataSource - -type DataSource interface { - Run() - Stop() - GetType() string - Subscribe() chan Data - Unsubscribe(chan Data) -} - -// DefaultSource - -type DefaultSource struct { - subscribers sync.Map -} - -// NewDefaultSource - -func NewDefaultSource() *DefaultSource { - return &DefaultSource{} -} - -// Subscribe - -func (s *DefaultSource) Subscribe() chan Data { - ch := make(chan Data) - s.subscribers.Store(ch, struct{}{}) - return ch -} - -// Unsubscribe - -func (s *DefaultSource) Unsubscribe(ch chan Data) { - if _, ok := s.subscribers.Load(ch); ok { - close(ch) - s.subscribers.Delete(ch) - } -} - -// Data - -type Data struct { - Type string - Kind string - Body interface{} -} diff --git a/cmd/api/ws/datasources/rabbit.go b/cmd/api/ws/datasources/rabbit.go deleted file mode 100644 index e9964e48c..000000000 --- a/cmd/api/ws/datasources/rabbit.go +++ /dev/null @@ -1,111 +0,0 @@ -package datasources - -import ( - "sync" - - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/baking-bad/bcdhub/internal/mq" - "github.com/pkg/errors" -) - -var rabbitStopped = errors.Errorf("WS_RABBIT_STOPPED") - -// RabbitMQ - -type RabbitMQ struct { - *DefaultSource - - source mq.Mediator - - stop chan struct{} - wg sync.WaitGroup -} - -// NewRabbitMQ - -func NewRabbitMQ(messageQueue mq.Mediator) (*RabbitMQ, error) { - return &RabbitMQ{ - DefaultSource: NewDefaultSource(), - source: messageQueue, - stop: make(chan struct{}), - }, nil -} - -// Run - -func (c *RabbitMQ) Run() { - if len(c.source.GetQueues()) == 0 { - logger.Warning("Empty rabbit queues") - return - } - - for _, queue := range c.source.GetQueues() { - c.wg.Add(1) - go c.listenChannel(queue) - } -} - -// Stop - -func (c *RabbitMQ) Stop() { - close(c.stop) - c.wg.Wait() - c.subscribers.Range(func(key, val interface{}) bool { - close(key.(chan Data)) - return true - }) - c.source.Close() -} - -// GetType - -func (c *RabbitMQ) GetType() string { - return RabbitType -} - -func (c *RabbitMQ) listenChannel(queue string) { - defer c.wg.Done() - - msgs, err := c.source.Consume(queue) - if err != nil { - logger.Errorf("[%s data source] %s. Stop.", c.GetType(), err.Error()) - return - } - - for { - select { - case <-c.stop: - return - case msg := <-msgs: - if err := c.handler(msg); err != nil { - if errors.Is(err, rabbitStopped) { - return - } - logger.Errorf("[%s data source] %s", c.GetType(), err.Error()) - } - } - } -} - -func (c *RabbitMQ) handler(data mq.Data) error { - switch data.GetKey() { - case mq.QueueOperations, mq.QueueBlocks: - val := Data{ - Type: c.GetType(), - Kind: data.GetKey(), - Body: data.GetBody(), - } - - c.subscribers.Range(func(key, value interface{}) bool { - ch := key.(chan Data) - ch <- val - return true - }) - default: - if data.GetKey() == "" { - logger.Warning("Rabbit MQ server stopped! API need to be restarted. Closing connection...") - return rabbitStopped - } - return errors.Errorf("Unknown data routing key %s", data.GetKey()) - } - - if err := data.Ack(false); err != nil { - return errors.Errorf("Error acknowledging message: %s", err) - } - return nil -} diff --git a/cmd/api/ws/handler.go b/cmd/api/ws/handler.go deleted file mode 100644 index e9500dac8..000000000 --- a/cmd/api/ws/handler.go +++ /dev/null @@ -1,30 +0,0 @@ -package ws - -import ( - "net/http" - - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/gin-gonic/gin" - "github.com/gorilla/websocket" -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -// Handler - -func Handler(c *gin.Context, hub *Hub) { - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - logger.Error(err) - return - } - - client := NewClient(conn) - client.Run() - hub.AddClient(client) -} diff --git a/cmd/api/ws/hub.go b/cmd/api/ws/hub.go deleted file mode 100644 index 7fbcc472c..000000000 --- a/cmd/api/ws/hub.go +++ /dev/null @@ -1,209 +0,0 @@ -package ws - -import ( - "fmt" - "sync" - - "github.com/baking-bad/bcdhub/cmd/api/handlers" - "github.com/baking-bad/bcdhub/cmd/api/ws/channels" - "github.com/baking-bad/bcdhub/cmd/api/ws/datasources" - "github.com/pkg/errors" - "github.com/valyala/fastjson" -) - -// Hub - -type Hub struct { - sources []datasources.DataSource - clients sync.Map - public sync.Map - - ctx *handlers.Context - - stop chan struct{} - wg sync.WaitGroup -} - -// NewHub - -func NewHub(opts ...HubOption) *Hub { - h := &Hub{ - sources: make([]datasources.DataSource, 0), - - stop: make(chan struct{}), - } - - for _, opt := range opts { - opt(h) - } - - return h -} - -// DefaultHub - -func DefaultHub(ctx *handlers.Context) *Hub { - hub := NewHub( - WithRabbitSource(ctx.MQ), - WithContext(ctx), - ) - - hub.AddPublicChannel(channels.NewStatsChannel( - channels.WithSource(hub.sources, datasources.RabbitType), - channels.WithContext(ctx), - )) - return hub -} - -// AddPublicChannel - -func (h *Hub) AddPublicChannel(channel channels.Channel) { - h.public.Store(channel.GetName(), channel) -} - -// AddClient - -func (h *Hub) AddClient(client *Client) { - client.hub = h - client.AddHandler("subscribe", subscribeHandler) - client.AddHandler("unsubscribe", unsubscribeHandler) - client.AddHandler("ping", pingHandler) - h.clients.Store(client.id, client) -} - -// GetPublicChannel - -func (h *Hub) GetPublicChannel(name string) (channels.Channel, bool) { - c, ok := h.public.Load(name) - if !ok { - return nil, ok - } - channel, ok := c.(channels.Channel) - return channel, ok -} - -// RemoveClient - -func (h *Hub) RemoveClient(client *Client) { - if _, ok := h.clients.Load(client.id); ok { - h.clients.Delete(client.id) - } -} - -// Run - -func (h *Hub) Run() { - for i := range h.sources { - h.sources[i].Run() - } - - h.public.Range(func(key, val interface{}) bool { - ch := val.(channels.Channel) - h.runChannel(ch) - return true - }) -} - -func (h *Hub) runChannel(channel channels.Channel) { - h.wg.Add(1) - go h.listenChannel(channel) - channel.Run() -} - -// Stop - -func (h *Hub) Stop() { - defer h.wg.Wait() - - close(h.stop) - - h.clients.Range(func(key, val interface{}) bool { - client := val.(*Client) - client.Close() - return true - }) - - h.public.Range(func(key, val interface{}) bool { - ch := val.(channels.Channel) - ch.Stop() - return true - }) -} - -func (h *Hub) listenChannel(channel channels.Channel) { - defer h.wg.Done() - for { - select { - case <-h.stop: - return - case msg := <-channel.Listen(): - if msg.Body == nil && msg.ChannelName == "" { - return - } - h.clients.Range(func(key, val interface{}) bool { - client := val.(*Client) - client.Send(msg) - return true - }) - } - } -} - -func createDynamicChannels(c *Client, channelName string, data *fastjson.Value) (channels.Channel, error) { - switch channelName { - case "operations": - address := parseString(data, "address") - network := parseString(data, "network") - - operationsChannelName := fmt.Sprintf("%s_%s_%s", channelName, network, address) - if _, ok := c.subscriptions.Load(operationsChannelName); ok { - return nil, nil - } - return channels.NewOperationsChannel(address, network, - channels.WithSource(c.hub.sources, datasources.RabbitType), - channels.WithContext(c.hub.ctx), - ), nil - default: - return nil, errors.Errorf("Unknown channel: %s", channelName) - } -} - -func subscribeHandler(c *Client, data []byte) error { - var p fastjson.Parser - val, err := p.ParseBytes(data) - if err != nil { - return err - } - channelName := parseString(val, "channel") - channel, ok := c.hub.GetPublicChannel(channelName) - if !ok { - channel, err = createDynamicChannels(c, channelName, val) - if err != nil { - return err - } - if channel == nil { - return nil - } - c.hub.runChannel(channel) - } - c.subscriptions.Store(channel.GetName(), channel) - - return channel.Init() -} - -func unsubscribeHandler(c *Client, data []byte) error { - var p fastjson.Parser - val, err := p.ParseBytes(data) - if err != nil { - return err - } - channelName := parseString(val, "channel") - if channel, ok := c.GetSubscription(channelName); ok { - if _, isPublic := c.hub.public.Load(channelName); !isPublic { - channel.Stop() - } - } - c.subscriptions.Delete(channelName) - return c.sendOk(fmt.Sprintf("unsubscribed from %s", channelName)) -} - -func pingHandler(c *Client, data []byte) error { - return c.sendMessage(ActionMessage{ - Action: "pong", - }) -} - -func parseString(val *fastjson.Value, key string) string { - return string(val.GetStringBytes(key)) -} diff --git a/cmd/api/ws/messages.go b/cmd/api/ws/messages.go deleted file mode 100644 index f15a89f0b..000000000 --- a/cmd/api/ws/messages.go +++ /dev/null @@ -1,22 +0,0 @@ -package ws - -import jsoniter "github.com/json-iterator/go" - -var json = jsoniter.ConfigCompatibleWithStandardLibrary - -// Statuses -const ( - ErrorStatus = "error" - OkStatus = "ok" -) - -// StatusMessage - -type StatusMessage struct { - Status string `json:"status"` - Text string `json:"text"` -} - -// ActionMessage - -type ActionMessage struct { - Action string `json:"action"` -} diff --git a/cmd/api/ws/options.go b/cmd/api/ws/options.go deleted file mode 100644 index bd69b8872..000000000 --- a/cmd/api/ws/options.go +++ /dev/null @@ -1,37 +0,0 @@ -package ws - -import ( - "github.com/baking-bad/bcdhub/cmd/api/handlers" - "github.com/baking-bad/bcdhub/cmd/api/ws/datasources" - "github.com/baking-bad/bcdhub/internal/logger" - "github.com/baking-bad/bcdhub/internal/mq" -) - -// HubOption - -type HubOption func(*Hub) - -// WithSource - -func WithSource(source datasources.DataSource) HubOption { - return func(h *Hub) { - h.sources = append(h.sources, source) - } -} - -// WithRabbitSource - -func WithRabbitSource(messageQueue mq.Mediator) HubOption { - return func(h *Hub) { - rmq, err := datasources.NewRabbitMQ(messageQueue) - if err != nil { - logger.Error(err) - return - } - h.sources = append(h.sources, rmq) - } -} - -// WithContext - -func WithContext(ctx *handlers.Context) HubOption { - return func(h *Hub) { - h.ctx = ctx - } -}