Skip to content

Commit

Permalink
private consensus and more server options
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrbrt committed Nov 20, 2024
1 parent 1b25b27 commit 5fea5cd
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 69 deletions.
43 changes: 14 additions & 29 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ const (
QueryPathStore = "store"
)

var _ abci.Application = (*Consensus[transaction.Tx])(nil)
var _ abci.Application = (*consensus[transaction.Tx])(nil)

// Consensus contains the implementation of the ABCI interface for CometBFT.
type Consensus[T transaction.Tx] struct {
// consensus contains the implementation of the ABCI interface for CometBFT.
type consensus[T transaction.Tx] struct {
logger log.Logger
appName, version string
app appmanager.AppManager[T]
Expand Down Expand Up @@ -84,24 +84,9 @@ type Consensus[T transaction.Tx] struct {
getProtoRegistry func() (*protoregistry.Files, error)
}

// SetStreamingManager sets the streaming manager for the consensus module.
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
return fmt.Errorf("failed to register snapshot extensions: %w", err)
}

return nil
}

// CheckTx implements types.Application.
// It is called by cometbft to verify transaction validity
func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
decodedTx, err := c.txCodec.Decode(req.Tx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -139,7 +124,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
}

// Info implements types.Application.
func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
func (c *consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
version, _, err := c.store.StateLatest()
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,7 +164,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc

// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
if isGRPC {
return resp, err
Expand Down Expand Up @@ -213,7 +198,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return resp, nil
}

func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
// if this fails then we cannot serve queries anymore
registry, err := c.getProtoRegistry()
if err != nil {
Expand Down Expand Up @@ -304,7 +289,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
}

// InitChain implements types.Application.
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)

// store chainID to be used later on in execution
Expand Down Expand Up @@ -388,7 +373,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe

// PrepareProposal implements types.Application.
// It is called by cometbft to prepare a proposal block.
func (c *Consensus[T]) PrepareProposal(
func (c *consensus[T]) PrepareProposal(
ctx context.Context,
req *abciproto.PrepareProposalRequest,
) (resp *abciproto.PrepareProposalResponse, err error) {
Expand Down Expand Up @@ -424,7 +409,7 @@ func (c *Consensus[T]) PrepareProposal(

// ProcessProposal implements types.Application.
// It is called by cometbft to process/verify a proposal block.
func (c *Consensus[T]) ProcessProposal(
func (c *consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
Expand Down Expand Up @@ -458,7 +443,7 @@ func (c *Consensus[T]) ProcessProposal(

// FinalizeBlock implements types.Application.
// It is called by cometbft to finalize a block.
func (c *Consensus[T]) FinalizeBlock(
func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
Expand Down Expand Up @@ -548,7 +533,7 @@ func (c *Consensus[T]) FinalizeBlock(

// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
lastCommittedHeight := c.lastCommittedHeight.Load()

c.snapshotManager.SnapshotIfApplicable(lastCommittedHeight)
Expand All @@ -566,7 +551,7 @@ func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (
// Vote extensions

// VerifyVoteExtension implements types.Application.
func (c *Consensus[T]) VerifyVoteExtension(
func (c *consensus[T]) VerifyVoteExtension(
ctx context.Context,
req *abciproto.VerifyVoteExtensionRequest,
) (*abciproto.VerifyVoteExtensionResponse, error) {
Expand Down Expand Up @@ -608,7 +593,7 @@ func (c *Consensus[T]) VerifyVoteExtension(
}

// ExtendVote implements types.Application.
func (c *Consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
// If vote extensions are not enabled, as a safety precaution, we return an
// error.
cp, err := c.GetConsensusParams(ctx)
Expand Down
4 changes: 2 additions & 2 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func TestConsensus_Query(t *testing.T) {
require.Equal(t, res.Value, []byte(nil))
}

func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *Consensus[mock.Tx] {
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *consensus[mock.Tx] {
t.Helper()

msgRouterBuilder := getMsgRouterBuilder(t, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) {
Expand Down Expand Up @@ -700,7 +700,7 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
nil,
)

return &Consensus[mock.Tx]{
return &consensus[mock.Tx]{
logger: log.NewNopLogger(),
appName: "testing-app",
app: am,
Expand Down
14 changes: 10 additions & 4 deletions server/v2/cometbft/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/types"
"cosmossdk.io/server/v2/streaming"
"cosmossdk.io/store/v2/snapshots"
)

Expand All @@ -23,9 +24,14 @@ type ServerOptions[T transaction.Tx] struct {
ExtendVoteHandler handlers.ExtendVoteHandler
KeygenF keyGenF

Mempool func(cfg map[string]any) mempool.Mempool[T]
SnapshotOptions func(cfg map[string]any) snapshots.SnapshotOptions
SnapshotExtensions map[string]snapshots.ExtensionSnapshotter
// Set mempool for the consensus module.
Mempool func(cfg map[string]any) mempool.Mempool[T]
// Set streaming manager for the consensus module.
StreamingManager streaming.Manager
// Set snapshot options for the consensus module.
SnapshotOptions func(cfg map[string]any) snapshots.SnapshotOptions
// Allows additional snapshotter implementations to be used for creating and restoring snapshots.
SnapshotExtensions []snapshots.ExtensionSnapshotter

AddrPeerFilter types.PeerFilter // filter peers by address and port
IdPeerFilter types.PeerFilter // filter peers by node ID
Expand All @@ -42,7 +48,7 @@ func DefaultServerOptions[T transaction.Tx]() ServerOptions[T] {
ExtendVoteHandler: handlers.NoOpExtendVote(),
Mempool: func(cfg map[string]any) mempool.Mempool[T] { return mempool.NoOpMempool[T]{} },
SnapshotOptions: func(cfg map[string]any) snapshots.SnapshotOptions { return snapshots.NewSnapshotOptions(0, 0) },
SnapshotExtensions: map[string]snapshots.ExtensionSnapshotter{},
SnapshotExtensions: []snapshots.ExtensionSnapshotter{},
AddrPeerFilter: nil,
IdPeerFilter: nil,
KeygenF: func() (cmtcrypto.PrivKey, error) { return cmted22519.GenPrivKey(), nil },
Expand Down
6 changes: 3 additions & 3 deletions server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
cometerrors "cosmossdk.io/server/v2/cometbft/types/errors"
)

func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error) {
func (c *consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error) {
// "/p2p" prefix for p2p queries
if len(path) < 4 {
return nil, errorsmod.Wrap(cometerrors.ErrUnknownRequest, "path should be p2p filter <addr|id> <parameter>")
Expand Down Expand Up @@ -41,7 +41,7 @@ func (c *Consensus[T]) handleQueryP2P(path []string) (*abci.QueryResponse, error
// simulates the transaction using the application, and returns the simulation result.
// If the second element is 'version', it returns the version of the application.
// If the second element is neither 'simulate' nor 'version', it returns an error indicating an unknown query.
func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
func (c *consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
if len(path) < 2 {
return nil, errorsmod.Wrap(
cometerrors.ErrUnknownRequest,
Expand Down Expand Up @@ -83,7 +83,7 @@ func (c *Consensus[T]) handlerQueryApp(ctx context.Context, path []string, req *
return nil, errorsmod.Wrapf(cometerrors.ErrUnknownRequest, "unknown query: %s", path)
}

func (c *Consensus[T]) handleQueryStore(path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
func (c *consensus[T]) handleQueryStore(path []string, req *abci.QueryRequest) (*abci.QueryResponse, error) {
req.Path = "/" + strings.Join(path[1:], "/")
if req.Height <= 1 && req.Prove {
return nil, errorsmod.Wrap(
Expand Down
46 changes: 27 additions & 19 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,33 @@ func New[T transaction.Tx](
listener = &indexingTarget.Listener
}

srv.Consensus = &Consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
cfg: srv.config,
store: store,
logger: logger,
txCodec: txCodec,
appCodec: appCodec,
streaming: streaming.Manager{},
listener: listener,
snapshotManager: snapshots.NewManager(
snapshotStore,
srv.serverOptions.SnapshotOptions(cfg),
sc,
ss,
srv.serverOptions.SnapshotExtensions,
logger,
),
// snapshot manager
snapshotManager := snapshots.NewManager(
snapshotStore,
srv.serverOptions.SnapshotOptions(cfg),
sc,
ss,
nil, // extensions snapshotter registered below
logger,
)
if exts := serverOptions.SnapshotExtensions; len(exts) > 0 {
if err := snapshotManager.RegisterExtensions(serverOptions.SnapshotExtensions...); err != nil {
return nil, fmt.Errorf("failed to register snapshot extensions: %w", err)
}
}

srv.Consensus = &consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
cfg: srv.config,
store: store,
logger: logger,
txCodec: txCodec,
appCodec: appCodec,
streaming: streaming.Manager{},
listener: listener,
snapshotManager: snapshotManager,
mempool: srv.serverOptions.Mempool(cfg),
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: srv.serverOptions.PrepareProposalHandler,
Expand Down
8 changes: 4 additions & 4 deletions server/v2/cometbft/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func GetSnapshotStore(rootDir string) (*snapshots.Store, error) {
}

// ApplySnapshotChunk implements types.Application.
func (c *Consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnapshotChunkRequest) (*abci.ApplySnapshotChunkResponse, error) {
func (c *consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnapshotChunkRequest) (*abci.ApplySnapshotChunkResponse, error) {
if c.snapshotManager == nil {
c.logger.Error("snapshot manager not configured")
return &abci.ApplySnapshotChunkResponse{Result: abci.APPLY_SNAPSHOT_CHUNK_RESULT_ABORT}, nil
Expand Down Expand Up @@ -65,7 +65,7 @@ func (c *Consensus[T]) ApplySnapshotChunk(_ context.Context, req *abci.ApplySnap
}

// ListSnapshots implements types.Application.
func (c *Consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsRequest) (*abci.ListSnapshotsResponse, error) {
func (c *consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsRequest) (*abci.ListSnapshotsResponse, error) {
if c.snapshotManager == nil {
return nil, nil
}
Expand All @@ -91,7 +91,7 @@ func (c *Consensus[T]) ListSnapshots(_ context.Context, ctx *abci.ListSnapshotsR
}

// LoadSnapshotChunk implements types.Application.
func (c *Consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapshotChunkRequest) (*abci.LoadSnapshotChunkResponse, error) {
func (c *consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapshotChunkRequest) (*abci.LoadSnapshotChunkResponse, error) {
if c.snapshotManager == nil {
return &abci.LoadSnapshotChunkResponse{}, nil
}
Expand All @@ -112,7 +112,7 @@ func (c *Consensus[T]) LoadSnapshotChunk(_ context.Context, req *abci.LoadSnapsh
}

// OfferSnapshot implements types.Application.
func (c *Consensus[T]) OfferSnapshot(_ context.Context, req *abci.OfferSnapshotRequest) (*abci.OfferSnapshotResponse, error) {
func (c *consensus[T]) OfferSnapshot(_ context.Context, req *abci.OfferSnapshotRequest) (*abci.OfferSnapshotResponse, error) {
if c.snapshotManager == nil {
c.logger.Error("snapshot manager not configured")
return &abci.OfferSnapshotResponse{Result: abci.OFFER_SNAPSHOT_RESULT_ABORT}, nil
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// streamDeliverBlockChanges will stream all the changes happened during deliver block.
func (c *Consensus[T]) streamDeliverBlockChanges(
func (c *consensus[T]) streamDeliverBlockChanges(
ctx context.Context,
height int64,
txs [][]byte,
Expand Down
14 changes: 7 additions & 7 deletions server/v2/cometbft/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
consensus "cosmossdk.io/x/consensus/types"
"cosmossdk.io/x/consensus/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand Down Expand Up @@ -268,7 +268,7 @@ func QueryResult(err error, debug bool) *abci.QueryResponse {
}
}

func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockRequest) error {
func (c *consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockRequest) error {
if req.Height < 1 {
return fmt.Errorf("invalid height: %d", req.Height)
}
Expand Down Expand Up @@ -302,26 +302,26 @@ func (c *Consensus[T]) validateFinalizeBlockHeight(req *abci.FinalizeBlockReques

// GetConsensusParams makes a query to the consensus module in order to get the latest consensus
// parameters from committed state
func (c *Consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.ConsensusParams, error) {
func (c *consensus[T]) GetConsensusParams(ctx context.Context) (*cmtproto.ConsensusParams, error) {
latestVersion, err := c.store.GetLatestVersion()
if err != nil {
return nil, err
}

res, err := c.app.Query(ctx, latestVersion, &consensus.QueryParamsRequest{})
res, err := c.app.Query(ctx, latestVersion, &types.QueryParamsRequest{})
if err != nil {
return nil, err
}

if r, ok := res.(*consensus.QueryParamsResponse); !ok {
if r, ok := res.(*types.QueryParamsResponse); !ok {
return nil, errors.New("failed to query consensus params")
} else {
// convert our params to cometbft params
return r.Params, nil
}
}

func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 {
func (c *consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, commitHeight int64) int64 {
// pruning is disabled if minRetainBlocks is zero
if c.cfg.AppTomlConfig.MinRetainBlocks == 0 {
return 0
Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *Consensus[T]) GetBlockRetentionHeight(cp *cmtproto.ConsensusParams, com
}

// checkHalt checks if height or time exceeds halt-height or halt-time respectively.
func (c *Consensus[T]) checkHalt(height int64, time time.Time) error {
func (c *consensus[T]) checkHalt(height int64, time time.Time) error {
var halt bool
switch {
case c.cfg.AppTomlConfig.HaltHeight > 0 && uint64(height) >= c.cfg.AppTomlConfig.HaltHeight:
Expand Down

0 comments on commit 5fea5cd

Please sign in to comment.