-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GRPC ingester config, code gen, wiring and docs #147
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package domain | ||
|
||
type GRPCIngesterConfig struct { | ||
// Flag to enable the GRPC ingester server | ||
Enabeld bool `mapstructure:"enabled"` | ||
|
||
// The maximum number of bytes to receive in a single GRPC message | ||
MaxReceiveMsgSizeBytes int `mapstructure:"max-receive-msg-size-bytes"` | ||
|
||
// The address of the GRPC ingester server | ||
ServerAddress string `mapstructure:"server-address"` | ||
|
||
// The number of seconds to wait for a connection to the server. | ||
ServerConnectionTimeoutSeconds int `mapstructure:"server-connection-timeout-seconds"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package mvc | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/osmosis-labs/sqs/sqsdomain" | ||
prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types" | ||
) | ||
|
||
// IngestUsecase represent the ingest's usecases | ||
type IngestUsecase interface { | ||
// ProcessPoolChunk processes the pool data chunk, returning error if any. | ||
// Caches the given pools in-memory until the end of the block processing. | ||
ProcessPoolChunk(ctx context.Context, poolChunk []*prototypes.PoolData) error | ||
|
||
// StartBlockProcess signifies the start of the given block height processing | ||
// It persists the given taker fee into the repository. | ||
StartBlockProcess(ctx context.Context, height uint64, takerFeesMap sqsdomain.TakerFeeMap) (err error) | ||
|
||
// EndBlockProcessing ends the given block processing on success, storing the height | ||
// internally. | ||
// Persists the given height as well as any previously processed pools in-store. | ||
// Resets the internal pools cache to be empty. | ||
EndBlockProcess(ctx context.Context, height uint64) (err error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Ingest | ||
|
||
This is a component that is responsible for ingesting and processing data. | ||
|
||
It exposes a GRPC API for Osmosis node clients to send data to be ingested. | ||
|
||
```mermaid | ||
sequenceDiagram | ||
participant NodeIngester | ||
Note over SQS,NodeIngester: SQS Ingest RPC Communication | ||
Note over NodeIngester: Acquires lock on block processing | ||
|
||
NodeIngester->>SQS: StartBlockProcess(height, taker_fees) | ||
Note over SQS: SQS starts processing height | ||
SQS-->>NodeIngester: StartBlockProcessReply | ||
|
||
NodeIngester->>SQS: ProcessChainPools(stream) | ||
NodeIngester-->>SQS: ProcessChainPools.Send(pools_data_chunk) | ||
Note over SQS: SQS transforms and loads pools into ingester cache | ||
NodeIngester-->>SQS: ProcessChainPools.Send(pools_data_chunk) | ||
Note over SQS: SQS transforms and loads pools into ingester cache | ||
|
||
NodeIngester->>SQS: ProcessChainPools.CloseSend() | ||
|
||
NodeIngester->>SQS: EndBlockProcess(EndBlockProcessRequest) | ||
Note over SQS: SQS commits all state into SQS repositories, making them available for clients. | ||
|
||
SQS-->>NodeIngester: EndBlockProcessReply | ||
|
||
Note over NodeIngester: Releases lock on block processing | ||
``` | ||
|
||
Note that, as of right now the protocol is syncronous where each GRPC call happens in sequence. However, from the | ||
node perspective it is processed in a separate goroutine, letting the node continue producing blocks. The node | ||
acquires a lock on the block processing so that the interaction is not affected by synching. | ||
|
||
This is not a concern since, when the node is caught up, the block time is approximately 4.5 seconds while entire | ||
protocol is capped at 1.5 seconds. | ||
|
||
Currently, we push all pool data into SQS every processed block. As we lower block time, we will introduce a mechanism for | ||
pushing the pool data only for the modified pools. This will allow us to drastically lower the protocol interaction from 1.5 seconds. | ||
|
||
Alternative methods will include making the protocol more asyncronous but this will require introducing more complex | ||
locking mechanisms which are overkill today. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package grpc | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"time" | ||
|
||
"github.com/osmosis-labs/sqs/domain" | ||
"github.com/osmosis-labs/sqs/domain/mvc" | ||
"github.com/osmosis-labs/sqs/sqsdomain" | ||
prototypes "github.com/osmosis-labs/sqs/sqsdomain/proto/types" | ||
"google.golang.org/grpc" | ||
) | ||
|
||
type IngestGRPCHandler struct { | ||
ingestUseCase mvc.IngestUsecase | ||
|
||
prototypes.UnimplementedSQSIngesterServer | ||
} | ||
|
||
type IngestProcessBlockArgs struct { | ||
Pools []sqsdomain.PoolI | ||
} | ||
|
||
var _ prototypes.SQSIngesterServer = &IngestGRPCHandler{} | ||
|
||
// NewIngestHandler will initialize the ingest/ resources endpoint | ||
func NewIngestGRPCHandler(us mvc.IngestUsecase, grpcIngesterConfig domain.GRPCIngesterConfig) (*grpc.Server, error) { | ||
ingestHandler := &IngestGRPCHandler{ | ||
ingestUseCase: us, | ||
} | ||
|
||
grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(grpcIngesterConfig.MaxReceiveMsgSizeBytes), grpc.ConnectionTimeout(time.Second*time.Duration(grpcIngesterConfig.ServerConnectionTimeoutSeconds))) | ||
prototypes.RegisterSQSIngesterServer(grpcServer, ingestHandler) | ||
|
||
return grpcServer, nil | ||
} | ||
|
||
// ProcessChainPools implements types.IngesterServer. | ||
func (i *IngestGRPCHandler) ProcessChainPools(stream prototypes.SQSIngester_ProcessChainPoolsServer) (err error) { | ||
var poolDataChunk prototypes.ChainPoolsDataChunk | ||
err = stream.RecvMsg(&poolDataChunk) | ||
for err == nil { | ||
err = i.ingestUseCase.ProcessPoolChunk(stream.Context(), poolDataChunk.Pools) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = stream.RecvMsg(&poolDataChunk) | ||
} | ||
|
||
if err != io.EOF { | ||
return err | ||
} | ||
|
||
return stream.SendAndClose(&prototypes.ProcessChainPoolsReply{}) | ||
} | ||
|
||
// StartBlockProcess( implements types.IngesterServer. | ||
func (i *IngestGRPCHandler) StartBlockProcess(ctx context.Context, req *prototypes.StartBlockProcessRequest) (resp *prototypes.StartBlockProcessReply, err error) { | ||
takerFeeMap := sqsdomain.TakerFeeMap{} | ||
|
||
if err := takerFeeMap.UnmarshalJSON(req.TakerFeesMap); err != nil { | ||
return nil, err | ||
} | ||
|
||
// Start block processing with the taker fees. | ||
if err := i.ingestUseCase.StartBlockProcess(ctx, req.BlockHeight, takerFeeMap); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &prototypes.StartBlockProcessReply{}, nil | ||
} | ||
|
||
// EndBlockProcessing implements types.IngesterServer. | ||
func (i *IngestGRPCHandler) EndBlockProcess(ctx context.Context, req *prototypes.EndBlockProcessRequest) (resp *prototypes.EndBlockProcessReply, err error) { | ||
if err := i.ingestUseCase.EndBlockProcess(ctx, req.BlockHeight); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &prototypes.EndBlockProcessReply{}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package usecase | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/osmosis-labs/sqs/domain/mvc" | ||
"github.com/osmosis-labs/sqs/log" | ||
"github.com/osmosis-labs/sqs/sqsdomain" | ||
|
||
"github.com/osmosis-labs/sqs/sqsdomain/proto/types" | ||
) | ||
|
||
type ingestUseCase struct { | ||
// used for tracking the time taken to process a block | ||
startProcessingTime time.Time | ||
logger log.Logger | ||
} | ||
|
||
var _ mvc.IngestUsecase = &ingestUseCase{} | ||
|
||
// NewIngestUsecase will create a new ingester use case object | ||
func NewIngestUsecase(logger log.Logger) (mvc.IngestUsecase, error) { | ||
return &ingestUseCase{ | ||
startProcessingTime: time.Unix(0, 0), | ||
logger: logger, | ||
}, nil | ||
} | ||
|
||
// ProcessPoolChunk implements mvc.IngestUsecase. | ||
func (p *ingestUseCase) ProcessPoolChunk(ctx context.Context, poolData []*types.PoolData) error { | ||
return errors.New("not implemented") | ||
} | ||
|
||
// StartBlockProcess implements mvc.IngestUsecase. | ||
func (p *ingestUseCase) StartBlockProcess(ctx context.Context, height uint64, takerFeesMap sqsdomain.TakerFeeMap) (err error) { | ||
p.startProcessingTime = time.Now() | ||
p.logger.Info("starting block processing", zap.Uint64("height", height)) | ||
return errors.New("not implemented") | ||
} | ||
|
||
// EndBlockProcess implements mvc.IngestUsecase. | ||
func (p *ingestUseCase) EndBlockProcess(ctx context.Context, height uint64) (err error) { | ||
p.logger.Info("completed block processing", zap.Uint64("height", height), zap.Duration("duration", time.Since(p.startProcessingTime))) | ||
return errors.New("not implemented") | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: 16 KB. We can't leave comments. Once we implement yml config, will be able to transcribe this better