Skip to content

Commit

Permalink
GRPC ingester config, code gen and wiring
Browse files Browse the repository at this point in the history
  • Loading branch information
p0mvn committed Mar 21, 2024
1 parent a766ff3 commit deacfe6
Show file tree
Hide file tree
Showing 13 changed files with 1,132 additions and 11 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,6 @@ sqs-update-mainnet-state:
# Bench tests pricing
bench-pricing:
go test -bench BenchmarkGetPrices -run BenchmarkGetPrices github.com/osmosis-labs/sqs/tokens/usecase -count=6

proto-gen:
protoc --go_out=./ --go-grpc_out=./ --proto_path=./sqsdomain/proto ./sqsdomain/proto/ingest.proto
41 changes: 32 additions & 9 deletions app/sidecar_query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net"
"net/http"
"time"

Expand All @@ -11,6 +12,9 @@ import (
"github.com/redis/go-redis/v9"
"go.uber.org/zap"

ingestrpcdelivry "github.com/osmosis-labs/sqs/ingest/delivery/grpc"
ingestusecase "github.com/osmosis-labs/sqs/ingest/usecase"

chaininfousecase "github.com/osmosis-labs/sqs/chaininfo/usecase"
poolsHttpDelivery "github.com/osmosis-labs/sqs/pools/delivery/http"
poolsUseCase "github.com/osmosis-labs/sqs/pools/usecase"
Expand Down Expand Up @@ -39,7 +43,6 @@ import (
// and exposes endpoints for querying formatter and processed data from frontend.
type SideCarQueryServer interface {
GetTxManager() repository.TxManager
GetPoolsRepository() poolsredisrepo.PoolsRepository
GetChainInfoRepository() chaininforedisrepo.ChainInfoRepository
GetRouterRepository() routerredisrepo.RouterRepository
GetTokensUseCase() mvc.TokensUsecase
Expand All @@ -59,12 +62,6 @@ type sideCarQueryServer struct {
logger log.Logger
}

const (
// This is a directory path where the overwrite routes are backed up in case of failure.
// On restart, the overwrite routes are restored from this directory.
overwriteRoutesPath = "overwrite_routes"
)

// GetTokensUseCase implements SideCarQueryServer.
func (sqs *sideCarQueryServer) GetTokensUseCase() mvc.TokensUsecase {
return sqs.tokensUseCase
Expand Down Expand Up @@ -149,7 +146,7 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo

// Initialize router repository, usecase
routerRepository := routerredisrepo.New(redisTxManager, 0)
routerUsecase := routerUseCase.WithOverwriteRoutesPath(routerUseCase.NewRouterUsecase(timeoutContext, routerRepository, poolsUseCase, *config.Router, poolsUseCase.GetCosmWasmPoolConfig(), logger, cache.New(), cache.New()), overwriteRoutesPath)
routerUsecase := routerUseCase.NewRouterUsecase(timeoutContext, routerRepository, poolsUseCase, *config.Router, poolsUseCase.GetCosmWasmPoolConfig(), logger, cache.New(), cache.New())

// Initialize system handler
chainInfoRepository := chaininforedisrepo.New(redisTxManager)
Expand All @@ -172,6 +169,33 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo
}
routerHttpDelivery.NewRouterHandler(e, routerUsecase, tokensUseCase, logger)

// Start grpc ingest server if enabled
grpcIngesterConfig := config.GRPCIngester
if grpcIngesterConfig.Enabeld {
// Initialize ingest handler and usecase
ingestUseCase, err := ingestusecase.NewIngestUsecase(logger)
if err != nil {
return nil, err
}

grpcIngestHandler, err := ingestrpcdelivry.NewIngestGRPCHandler(ingestUseCase, *grpcIngesterConfig)
if err != nil {
panic(err)
}

go func() {
logger.Info("Starting grpc ingest server")

lis, err := net.Listen("tcp", grpcIngesterConfig.ServerAddress)
if err != nil {
panic(err)
}
if err := grpcIngestHandler.Serve(lis); err != nil {
panic(err)
}
}()
}

go func() {
logger.Info("Starting profiling server")
err = http.ListenAndServe("localhost:6062", nil)
Expand All @@ -182,7 +206,6 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo

return &sideCarQueryServer{
txManager: redisTxManager,
poolsRepository: poolsRepository,
chainInfoRepository: chainInfoRepository,
routerRepository: routerRepository,
tokensUseCase: tokensUseCase,
Expand Down
7 changes: 6 additions & 1 deletion config-testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"max-routes": 5,
"min-osmo-liquidity": 0
},
"enable-overwrite-routes-cache": true
"grpc-ingester":{
"enabled": false,
"max-receive-msg-size-bytes": 16777216,
"server-address": ":50051",
"server-connection-timeout-seconds": 10
}
}

7 changes: 6 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
"max-routes": 5,
"min-osmo-liquidity": 50
},
"enable-overwrite-routes-cache": true
"grpc-ingester":{
"enabled": false,
"max-receive-msg-size-bytes": 16777216,
"server-address": ":50051",
"server-connection-timeout-seconds": 10
}
}

2 changes: 2 additions & 0 deletions domain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ type Config struct {
Pools *PoolsConfig `mapstructure:"pools"`

Pricing *PricingConfig `mapstructure:"pricing"`

GRPCIngester *GRPCIngesterConfig `mapstructure:"grpc-ingester"`
}
15 changes: 15 additions & 0 deletions domain/ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package domain

type GRPCIngesterConfig struct {
// Flag to enable the GRPC ingester server
Enabeld bool `mapstructure:"enabled"`

// The maximum number of bytes to receive in a single GRPC message
MaxReceiveMsgSizeBytes int `mapstructure:"max-receive-msg-size-bytes"`

// The address of the GRPC ingester server
ServerAddress string `mapstructure:"server-address"`

// The number of seconds to wait for a connection to the server.
ServerConnectionTimeoutSeconds int `mapstructure:"server-connection-timeout-seconds"`
}
25 changes: 25 additions & 0 deletions domain/mvc/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package mvc

import (
"context"

"github.com/osmosis-labs/sqs/sqsdomain"
prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types"
)

// IngestUsecase represent the ingest's usecases
type IngestUsecase interface {
// ProcessPoolChunk processes the pool data chunk, returning error if any.
// Caches the given pools in-memory until the end of the block processing.
ProcessPoolChunk(ctx context.Context, poolChunk []*prototypes.PoolData) error

// StartBlockProcess signifies the start of the given block height processing
// It persists the given taker fee into the repository.
StartBlockProcess(ctx context.Context, height uint64, takerFeesMap sqsdomain.TakerFeeMap) (err error)

// EndBlockProcessing ends the given block processing on success, storing the height
// internally.
// Persists the given height as well as any previously processed pools in-store.
// Resets the internal pools cache to be empty.
EndBlockProcess(ctx context.Context, height uint64) (err error)
}
44 changes: 44 additions & 0 deletions ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Ingest

This is a component that is responsible for ingesting and processing data.

It exposes a GRPC API for Osmosis node clients to send data to be ingested.

```mermaid
sequenceDiagram
participant NodeIngester
Note over SQS,NodeIngester: SQS Ingest RPC Communication
Note over NodeIngester: Acquires lock on block processing
NodeIngester->>SQS: StartBlockProcess(height, taker_fees)
Note over SQS: SQS starts processing height
SQS-->>NodeIngester: StartBlockProcessReply
NodeIngester->>SQS: ProcessChainPools(stream)
NodeIngester-->>SQS: ProcessChainPools.Send(pools_data_chunk)
Note over SQS: SQS transforms and loads pools into ingester cache
NodeIngester-->>SQS: ProcessChainPools.Send(pools_data_chunk)
Note over SQS: SQS transforms and loads pools into ingester cache
NodeIngester->>SQS: ProcessChainPools.CloseSend()
NodeIngester->>SQS: EndBlockProcess(EndBlockProcessRequest)
Note over SQS: SQS commits all state into SQS repositories, making them available for clients.
SQS-->>NodeIngester: EndBlockProcessReply
Note over NodeIngester: Releases lock on block processing
```

Note that, as of right now the protocol is syncronous where each GRPC call happens in sequence. However, from the
node perspective it is processed in a separate goroutine, letting the node continue producing blocks. The node
acquires a lock on the block processing so that the interaction is not affected by synching.

This is not a concern since, when the node is caught up, the block time is approximately 4.5 seconds while entire
protocol is capped at 1.5 seconds.

Currently, we push all pool data into SQS every processed block. As we lower block time, we will introduce a mechanism for
pushing the pool data only for the modified pools. This will allow us to drastically lower the protocol interaction from 1.5 seconds.

Alternative methods will include making the protocol more asyncronous but this will require introducing more complex
locking mechanisms which are overkill today.
82 changes: 82 additions & 0 deletions ingest/delivery/grpc/ingest_grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package grpc

import (
"context"
"io"
"time"

"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/domain/mvc"
"github.com/osmosis-labs/sqs/sqsdomain"
prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types"
"google.golang.org/grpc"
)

type IngestGRPCHandler struct {
ingestUseCase mvc.IngestUsecase

prototypes.UnimplementedSQSIngesterServer
}

type IngestProcessBlockArgs struct {
Pools []sqsdomain.PoolI
}

var _ prototypes.SQSIngesterServer = &IngestGRPCHandler{}

// NewIngestHandler will initialize the ingest/ resources endpoint
func NewIngestGRPCHandler(us mvc.IngestUsecase, grpcIngesterConfig domain.GRPCIngesterConfig) (*grpc.Server, error) {
ingestHandler := &IngestGRPCHandler{
ingestUseCase: us,
}

grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(grpcIngesterConfig.MaxReceiveMsgSizeBytes), grpc.ConnectionTimeout(time.Second*time.Duration(grpcIngesterConfig.ServerConnectionTimeoutSeconds)))
prototypes.RegisterSQSIngesterServer(grpcServer, ingestHandler)

return grpcServer, nil
}

// ProcessChainPools implements types.IngesterServer.
func (i *IngestGRPCHandler) ProcessChainPools(stream prototypes.SQSIngester_ProcessChainPoolsServer) (err error) {
var poolDataChunk prototypes.ChainPoolsDataChunk
err = stream.RecvMsg(&poolDataChunk)
for err == nil {
err = i.ingestUseCase.ProcessPoolChunk(stream.Context(), poolDataChunk.Pools)
if err != nil {
return err
}

err = stream.RecvMsg(&poolDataChunk)
}

if err != io.EOF {
return err
}

return stream.SendAndClose(&prototypes.ProcessChainPoolsReply{})
}

// StartBlockProcess( implements types.IngesterServer.
func (i *IngestGRPCHandler) StartBlockProcess(ctx context.Context, req *prototypes.StartBlockProcessRequest) (resp *prototypes.StartBlockProcessReply, err error) {
takerFeeMap := sqsdomain.TakerFeeMap{}

if err := takerFeeMap.UnmarshalJSON(req.TakerFeesMap); err != nil {
return nil, err
}

// Start block processing with the taker fees.
if err := i.ingestUseCase.StartBlockProcess(ctx, req.BlockHeight, takerFeeMap); err != nil {
return nil, err
}

return &prototypes.StartBlockProcessReply{}, nil
}

// EndBlockProcessing implements types.IngesterServer.
func (i *IngestGRPCHandler) EndBlockProcess(ctx context.Context, req *prototypes.EndBlockProcessRequest) (resp *prototypes.EndBlockProcessReply, err error) {
if err := i.ingestUseCase.EndBlockProcess(ctx, req.BlockHeight); err != nil {
return nil, err
}

return &prototypes.EndBlockProcessReply{}, nil
}
49 changes: 49 additions & 0 deletions ingest/usecase/ingest_usecase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package usecase

import (
"context"
"errors"
"time"

"go.uber.org/zap"

"github.com/osmosis-labs/sqs/domain/mvc"
"github.com/osmosis-labs/sqs/log"
"github.com/osmosis-labs/sqs/sqsdomain"

"github.com/osmosis-labs/sqs/sqsdomain/proto/types"
)

type ingestUseCase struct {
// used for tracking the time taken to process a block
startProcessingTime time.Time
logger log.Logger
}

var _ mvc.IngestUsecase = &ingestUseCase{}

// NewIngestUsecase will create a new ingester use case object
func NewIngestUsecase(logger log.Logger) (mvc.IngestUsecase, error) {
return &ingestUseCase{
startProcessingTime: time.Unix(0, 0),
logger: logger,
}, nil
}

// ProcessPoolChunk implements mvc.IngestUsecase.
func (p *ingestUseCase) ProcessPoolChunk(ctx context.Context, poolData []*types.PoolData) error {
return errors.New("not implemented")
}

// StartBlockProcess implements mvc.IngestUsecase.
func (p *ingestUseCase) StartBlockProcess(ctx context.Context, height uint64, takerFeesMap sqsdomain.TakerFeeMap) (err error) {
p.startProcessingTime = time.Now()
p.logger.Info("starting block processing", zap.Uint64("height", height))
return errors.New("not implemented")
}

// EndBlockProcess implements mvc.IngestUsecase.
func (p *ingestUseCase) EndBlockProcess(ctx context.Context, height uint64) (err error) {
p.logger.Info("completed block processing", zap.Uint64("height", height), zap.Duration("duration", time.Since(p.startProcessingTime)))
return errors.New("not implemented")
}
Loading

0 comments on commit deacfe6

Please sign in to comment.