Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add unit test for websocket controller #6762

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ generate-mocks: install-mock-generators
mockery --name 'Storage' --dir=module/executiondatasync/tracker --case=underscore --output="module/executiondatasync/tracker/mock" --outpkg="mocktracker"
mockery --name 'ScriptExecutor' --dir=module/execution --case=underscore --output="module/execution/mock" --outpkg="mock"
mockery --name 'StorageSnapshot' --dir=fvm/storage/snapshot --case=underscore --output="fvm/storage/snapshot/mock" --outpkg="mock"
mockery --name 'DataProvider' --dir=engine/access/rest/websockets/data_provider --case=underscore --output="engine/access/rest/websockets/data_provider/mock" --outpkg="mock"
mockery --name 'Factory' --dir=engine/access/rest/websockets/data_provider --case=underscore --output="engine/access/rest/websockets/data_provider/mock" --outpkg="mock"
mockery --name 'WebsocketConnection' --dir=engine/access/rest/websockets --case=underscore --output="engine/access/rest/websockets/mock" --outpkg="mock"

#temporarily make insecure/ a non-module to allow mockery to create mocks
mv insecure/go.mod insecure/go2.mod
Expand Down
4 changes: 3 additions & 1 deletion engine/access/rest/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
flowhttp "github.com/onflow/flow-go/engine/access/rest/http"
"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/rest/websockets/data_provider"
legacyws "github.com/onflow/flow-go/engine/access/rest/websockets/legacy"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
Expand Down Expand Up @@ -93,7 +94,8 @@ func (b *RouterBuilder) AddWebsocketsRoute(
streamConfig backend.Config,
maxRequestSize int64,
) *RouterBuilder {
handler := websockets.NewWebSocketHandler(b.logger, config, chain, streamApi, streamConfig, maxRequestSize)
factory := data_provider.NewDataProviderFactory(b.logger, streamApi, streamConfig)
handler := websockets.NewWebSocketHandler(b.logger, config, chain, factory, maxRequestSize)
b.v1SubRouter.
Methods(http.MethodGet).
Path("/ws").
Expand Down
39 changes: 39 additions & 0 deletions engine/access/rest/websockets/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package websockets

import (
"github.com/gorilla/websocket"
)

type WebsocketConnection interface {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
Close() error
}

type GorillaWebsocketConnection struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add godoc here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's redundant in this case. The intention is pretty clear. What do you think?

conn *websocket.Conn
}

func NewGorillaWebsocketConnection(conn *websocket.Conn) *GorillaWebsocketConnection {
return &GorillaWebsocketConnection{
conn: conn,
}
}

var _ WebsocketConnection = (*GorillaWebsocketConnection)(nil)

func (m *GorillaWebsocketConnection) ReadJSON(v interface{}) error {
return m.conn.ReadJSON(v)
}

func (m *GorillaWebsocketConnection) WriteJSON(v interface{}) error {
return m.conn.WriteJSON(v)
}

func (m *GorillaWebsocketConnection) SetCloseHandler(handler func(code int, text string) error) {
m.conn.SetCloseHandler(handler)
}

func (m *GorillaWebsocketConnection) Close() error {
return m.conn.Close()
}
140 changes: 87 additions & 53 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,102 +3,119 @@ package websockets
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"

dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_provider"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/utils/concurrentmap"
)

var ErrEmptyMessage = errors.New("empty message")

type Controller struct {
logger zerolog.Logger
config Config
conn *websocket.Conn
conn WebsocketConnection
communicationChannel chan interface{}
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProvidersFactory *dp.Factory
dataProvidersFactory dp.Factory
shutdownOnce sync.Once
}

func NewWebSocketController(
logger zerolog.Logger,
config Config,
streamApi state_stream.API,
streamConfig backend.Config,
conn *websocket.Conn,
factory dp.Factory,
conn WebsocketConnection,
) *Controller {
return &Controller{
logger: logger.With().Str("component", "websocket-controller").Logger(),
config: config,
conn: conn,
communicationChannel: make(chan interface{}), //TODO: should it be buffered chan?
communicationChannel: make(chan interface{}, 10), //TODO: should it be buffered chan?
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProvidersFactory: dp.NewDataProviderFactory(logger, streamApi, streamConfig),
dataProvidersFactory: factory,
shutdownOnce: sync.Once{},
}
}

// HandleConnection manages the WebSocket connection, adding context and error handling.
func (c *Controller) HandleConnection(ctx context.Context) {
//TODO: configure the connection with ping-pong and deadlines
//TODO: spin up a response limit tracker routine
go c.readMessagesFromClient(ctx)
c.writeMessagesToClient(ctx)
go c.readMessages(ctx)
c.writeMessages(ctx)
}

// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
// writeMessages reads a messages from communication channel and passes them on to a client WebSocket connection.
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
// write message regulation
func (c *Controller) writeMessagesToClient(ctx context.Context) {
//TODO: can it run forever? maybe we should cancel the ctx in the reader routine
func (c *Controller) writeMessages(ctx context.Context) {
defer c.shutdownConnection()

for {
select {
case <-ctx.Done():
return
case msg := <-c.communicationChannel:
// TODO: handle 'response per second' limits
case msg, ok := <-c.communicationChannel:
if !ok {
return
}
c.logger.Debug().Msgf("read message from communication channel: %s", msg)

// TODO: handle 'response per second' limits
err := c.conn.WriteJSON(msg)
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) ||
websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return
}

c.logger.Error().Err(err).Msg("error writing to connection")
return
}

c.logger.Debug().Msg("written message to client")
}
}
}

// readMessagesFromClient continuously reads messages from a client WebSocket connection,
// readMessages continuously reads messages from a client WebSocket connection,
// processes each message, and handles actions based on the message type.
func (c *Controller) readMessagesFromClient(ctx context.Context) {
func (c *Controller) readMessages(ctx context.Context) {
defer c.shutdownConnection()

for {
select {
case <-ctx.Done():
c.logger.Info().Msg("context canceled, stopping read message loop")
return
default:
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
return
}
c.logger.Warn().Err(err).Msg("error reading message from client")
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) ||
websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return
} else if errors.Is(err, ErrEmptyMessage) {
continue
}

baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
return
}
c.logger.Debug().Err(err).Msg("error reading message from client")
continue
}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
}
baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
//TODO: write error to error channel
continue
}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Debug().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
//TODO: write error to error channel
continue
}
}
}
Expand All @@ -108,6 +125,11 @@ func (c *Controller) readMessage() (json.RawMessage, error) {
if err := c.conn.ReadJSON(&message); err != nil {
return nil, fmt.Errorf("error reading JSON from client: %w", err)
}

if message == nil {
return nil, ErrEmptyMessage
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
}

return message, nil
}

Expand Down Expand Up @@ -166,10 +188,18 @@ func (c *Controller) handleAction(ctx context.Context, message interface{}) erro
func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMessageRequest) {
dp := c.dataProvidersFactory.NewDataProvider(c.communicationChannel, msg.Topic)
c.dataProviders.Add(dp.ID(), dp)
dp.Run(ctx)

//TODO: return OK response to client
c.communicationChannel <- msg
// firstly, we want to write OK response to client and only after that we can start providing actual data
response := models.SubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
Success: true,
},
Topic: dp.Topic(),
ID: dp.ID().String(),
}
c.communicationChannel <- response

dp.Run(ctx)
}

func (c *Controller) handleUnsubscribe(_ context.Context, msg models.UnsubscribeMessageRequest) {
Expand All @@ -193,20 +223,24 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
}

func (c *Controller) shutdownConnection() {
defer close(c.communicationChannel)
defer func(conn *websocket.Conn) {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
c.shutdownOnce.Do(func() {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
defer close(c.communicationChannel)
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
defer func(conn WebsocketConnection) {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
if err := c.conn.Close(); err != nil {
c.logger.Warn().Err(err).Msg("error closing connection")
}
}(c.conn)

c.logger.Debug().Msg("shutting down connection")

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
return nil
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}
}(c.conn)

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
return nil
c.dataProviders.Clear()
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}

c.dataProviders.Clear()
}
Loading
Loading