Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: worker pool in ingest #368

Merged
merged 2 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

- Worker pool abstraction implementation & tests
- Switch prices to worker pool and remove concurrency for quotes since we support only one quote denom atm.
- Switch ingest block processing system to rely on worker pool with 2 block processing workers.

## v25.2.0

Expand Down
11 changes: 11 additions & 0 deletions docs/architecture/ingest.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ As a result, it is possible to see the following sequence of events:
- Height X: All Osmosis pools are pushed
- Height X+1: Only the pools that have changed within that height are pushed

## Worker Pool Architecture

The ingest module is designed to be a worker pool that processes the data in parallel.
We configure the number of workers to be equal to 2. The choice is arbitrary but allows for an efficient catch up
post-initial cold start that takes roughly 30 seconds.

Given the target chain block time of 1.5 seconds, we are 15 blocks behind after cold start, and 2 workers can process them all
under a few seconds.

Additionally, this mechanism helps to control resources and avoid overloading the system at cold start with many pre-computation requests.

## Parsing Block Pool Metadata

Since we may push either all pools or only the ones updated within a block, we
Expand Down
63 changes: 58 additions & 5 deletions ingest/delivery/grpc/ingest_grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/domain/mvc"
"github.com/osmosis-labs/sqs/domain/workerpool"
"github.com/osmosis-labs/sqs/sqsdomain"
prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types"
"google.golang.org/grpc"
Expand All @@ -16,20 +17,32 @@ type IngestGRPCHandler struct {
ingestUseCase mvc.IngestUsecase

prototypes.UnimplementedSQSIngesterServer

blockProcessDispatcher *workerpool.Dispatcher[uint64]
}

type IngestProcessBlockArgs struct {
Pools []sqsdomain.PoolI
}

const (
// numBlockProcessWorkers is the number of workers to process blocks concurrently
p0mvn marked this conversation as resolved.
Show resolved Hide resolved
// TODO: move to config
numBlockProcessWorkers = 2
)
Comment on lines +28 to +32
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO


var _ prototypes.SQSIngesterServer = &IngestGRPCHandler{}

// NewIngestHandler will initialize the ingest/ resources endpoint
func NewIngestGRPCHandler(us mvc.IngestUsecase, grpcIngesterConfig domain.GRPCIngesterConfig) (*grpc.Server, error) {
ingestHandler := &IngestGRPCHandler{
ingestUseCase: us,

blockProcessDispatcher: workerpool.NewDispatcher[uint64](numBlockProcessWorkers),
}

go ingestHandler.blockProcessDispatcher.Run()

grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(grpcIngesterConfig.MaxReceiveMsgSizeBytes), grpc.ConnectionTimeout(time.Second*time.Duration(grpcIngesterConfig.ServerConnectionTimeoutSeconds)))
prototypes.RegisterSQSIngesterServer(grpcServer, ingestHandler)

Expand All @@ -44,13 +57,53 @@ func (i *IngestGRPCHandler) ProcessBlock(ctx context.Context, req *prototypes.Pr
return nil, err
}

// Process block data
if err := i.ingestUseCase.ProcessBlockData(ctx, req.BlockHeight, takerFeeMap, req.Pools); err != nil {
// Increment error counter
domain.SQSIngestHandlerProcessBlockErrorCounter.WithLabelValues(err.Error(), strconv.FormatUint(req.BlockHeight, 10)).Inc()

// Empty result queue and return the first error encountered if any
// THis allows to trigger the fallback mechanism, reingesting all data
// if any error is detected. Under normal circumstances, this should not
// be triggered.
err := i.emptyResults()
if err != nil {
return nil, err
}

// Dispatch block processing
i.blockProcessDispatcher.JobQueue <- workerpool.Job[uint64]{
Task: func() (uint64, error) {
// Process block data
// Note that this executed a new background context since the parent context
// if the RPC call will be cancelled after the RPC call is done.
if err := i.ingestUseCase.ProcessBlockData(context.Background(), req.BlockHeight, takerFeeMap, req.Pools); err != nil {
// Increment error counter
domain.SQSIngestHandlerProcessBlockErrorCounter.WithLabelValues(err.Error(), strconv.FormatUint(req.BlockHeight, 10)).Inc()

return req.BlockHeight, err
}

return req.BlockHeight, nil
},
}

return &prototypes.ProcessBlockReply{}, nil
}

// emptyResults will empty the result queue and return the first error encountered if any.
// If no errors are encountered, it will return nil.
func (i *IngestGRPCHandler) emptyResults() error {
// TODO: consider loop bound
for {
select {
// Empty result queue and return if there are any errors
// to trigger the fallback mechanism, reingesting all data.
case prevResult := <-i.blockProcessDispatcher.ResultQueue:
if prevResult.Err != nil {
// Increment error counter
domain.SQSIngestHandlerProcessBlockErrorCounter.WithLabelValues(prevResult.Err.Error(), strconv.FormatUint(prevResult.Result, 10)).Inc()

return prevResult.Err
}
default:
// No more results in the channel, continue execution
return nil
}
}
}
Loading