From 642c7311cce08a0663fc7a5d33ab7c2515b1ff04 Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Mon, 9 Dec 2024 19:10:41 +0700 Subject: [PATCH 01/13] add consensus address codec to consensus --- server/v2/cometbft/server.go | 2 ++ simapp/v2/simdv2/cmd/commands.go | 1 + 2 files changed, 3 insertions(+) diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 51c24285e440..7a693b43f8a3 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -41,6 +41,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" + addresscodec "cosmossdk.io/core/address" ) const ServerName = "comet" @@ -66,6 +67,7 @@ type CometBFTServer[T transaction.Tx] struct { } func New[T transaction.Tx]( + consensusAddressCodec addresscodec.Codec, logger log.Logger, appName string, store types.Store, diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index 28586e32c2de..056c2609061a 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -111,6 +111,7 @@ func InitRootCmd[T transaction.Tx]( // consensus component if deps.ConsensusServer == nil { deps.ConsensusServer, err = cometbft.New( + deps.ClientContext.ConsensusAddressCodec, logger, simApp.Name(), simApp.Store(), From 1bfe31aecff7ba70242c5e26a1aef2275948721a Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Mon, 9 Dec 2024 19:11:28 +0700 Subject: [PATCH 02/13] handle comet service case --- server/v2/cometbft/abci.go | 75 +++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index c376f4ac2690..91b82addfef3 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -35,6 +35,9 @@ import ( "cosmossdk.io/store/v2/snapshots" consensustypes "cosmossdk.io/x/consensus/types" + addresscodec "cosmossdk.io/core/address" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" + "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" @@ -86,8 +89,9 @@ type consensus[T transaction.Tx] struct { addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID - queryHandlersMap map[string]appmodulev2.Handler - getProtoRegistry func() (*protoregistry.Files, error) + queryHandlersMap map[string]appmodulev2.Handler + getProtoRegistry func() (*protoregistry.Files, error) + consensusAddressCodec addresscodec.Codec } // CheckTx implements types.Application. @@ -184,6 +188,15 @@ func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( return resp, err } + // when a client did not provide a query height, manually inject the latest + if req.Height == 0 { + lastestVersion, err := c.store.GetLatestVersion() + if err != nil { + return nil, err + } + req.Height = int64(lastestVersion) + } + // this error most probably means that we can't handle it with a proto message, so // it must be an app/p2p/store query path := splitABCIQueryPath(req.Path) @@ -238,6 +251,40 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq handlerFullName = string(md.Input().FullName()) } + // Handle comet service + if strings.Contains(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { + rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) + cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) + paths := strings.Split(req.Path, "/") + + var resp transaction.Msg + var err error + switch paths[2] { + case "GetNodeInfo": + resp, err = handleCometService(ctx, req, cometQServer.GetNodeInfo) + case "GetSyncing": + resp, err = handleCometService(ctx, req, cometQServer.GetSyncing) + case "GetLatestBlock": + resp, err = handleCometService(ctx, req, cometQServer.GetLatestBlock) + case "GetBlockByHeight": + resp, err = handleCometService(ctx, req, cometQServer.GetBlockByHeight) + case "GetLatestValidatorSet": + resp, err = handleCometService(ctx, req, cometQServer.GetLatestValidatorSet) + case "GetValidatorSetByHeight": + resp, err = handleCometService(ctx, req, cometQServer.GetValidatorSetByHeight) + case "ABCIQuery": + resp, err = handleCometService(ctx, req, cometQServer.ABCIQuery) + } + + if err != nil { + return nil, true, err + } + + res, err := queryResponse(resp, req.Height) + return res, true, err + + } + // special case for simulation as it is an external gRPC registered on the grpc server component // and not on the app itself, so it won't pass the router afterwards. if req.Path == "/cosmos.tx.v1beta1.Service/Simulate" { @@ -303,6 +350,30 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq return resp, true, err } +func handleCometService[T any, PT interface { + *T + gogoproto.Message +}, + U any, UT interface { + *U + gogoproto.Message + }]( + ctx context.Context, + rawReq *abciproto.QueryRequest, + handler func(ctx context.Context, msg PT) (UT, error), +) (transaction.Msg, error) { + req := PT(new(T)) + err := gogoproto.Unmarshal(rawReq.Data, req) + if err != nil { + return nil, err + } + typedResp, err := handler(ctx, req) + if err != nil { + return nil, err + } + return typedResp, nil +} + // InitChain implements types.Application. func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) { c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId) From 3110801917dcf0135ce0803f9980713f90aa0ea1 Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Mon, 9 Dec 2024 23:18:50 +0700 Subject: [PATCH 03/13] node service --- server/v2/cometbft/abci.go | 25 ++++++++++++++++++++++++- server/v2/cometbft/server.go | 3 ++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 91b82addfef3..3d5fa5140942 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -21,7 +21,6 @@ import ( "cosmossdk.io/core/event" "cosmossdk.io/core/server" "cosmossdk.io/core/store" - "cosmossdk.io/core/transaction" errorsmod "cosmossdk.io/errors/v2" "cosmossdk.io/log" "cosmossdk.io/schema/appdata" @@ -36,6 +35,8 @@ import ( consensustypes "cosmossdk.io/x/consensus/types" addresscodec "cosmossdk.io/core/address" + coreserver "cosmossdk.io/core/server" + "cosmossdk.io/core/transaction" rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/codec" @@ -92,6 +93,7 @@ type consensus[T transaction.Tx] struct { queryHandlersMap map[string]appmodulev2.Handler getProtoRegistry func() (*protoregistry.Files, error) consensusAddressCodec addresscodec.Codec + cfgMap coreserver.ConfigMap } // CheckTx implements types.Application. @@ -282,7 +284,28 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq res, err := queryResponse(resp, req.Height) return res, true, err + } + + // Handle node service + if strings.Contains(req.Path, "/cosmos.base.node.v1beta1.Service") { + nodeQService := nodeServer[transaction.Tx]{c.cfgMap, c.cfg.AppTomlConfig, c} + paths := strings.Split(req.Path, "/") + + var resp transaction.Msg + var err error + switch paths[2] { + case "Config": + resp, err = handleCometService(ctx, req, nodeQService.Config) + case "Status": + resp, err = handleCometService(ctx, req, nodeQService.Status) + } + + if err != nil { + return nil, true, err + } + res, err := queryResponse(resp, req.Height) + return res, true, err } // special case for simulation as it is an external gRPC registered on the grpc server component diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 7a693b43f8a3..397e74dbb775 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -38,10 +38,10 @@ import ( "cosmossdk.io/server/v2/cometbft/types" "cosmossdk.io/store/v2/snapshots" + addresscodec "cosmossdk.io/core/address" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" - addresscodec "cosmossdk.io/core/address" ) const ServerName = "comet" @@ -191,6 +191,7 @@ func New[T transaction.Tx]( getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), addrPeerFilter: srv.serverOptions.AddrPeerFilter, idPeerFilter: srv.serverOptions.IdPeerFilter, + cfgMap: cfg, } c.optimisticExec = oe.NewOptimisticExecution( From 9b8f1152ffa066b578b8fd5241ee255383dfa12e Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 14:05:38 +0700 Subject: [PATCH 04/13] init client context & tx service --- server/v2/cometbft/abci.go | 63 +++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 3d5fa5140942..be73d5867fa2 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -38,11 +38,14 @@ import ( coreserver "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" rpchttp "github.com/cometbft/cometbft/rpc/client/http" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/std" sdk "github.com/cosmos/cosmos-sdk/types" txtypes "github.com/cosmos/cosmos-sdk/types/tx" + authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" ) const ( @@ -288,7 +291,7 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq // Handle node service if strings.Contains(req.Path, "/cosmos.base.node.v1beta1.Service") { - nodeQService := nodeServer[transaction.Tx]{c.cfgMap, c.cfg.AppTomlConfig, c} + nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c} paths := strings.Split(req.Path, "/") var resp transaction.Msg @@ -308,6 +311,64 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq return res, true, err } + // Handle tx service + if strings.Contains(req.Path, "/cosmos.tx.v1beta1.Service") { + // init simple client context + amino := codec.NewLegacyAmino() + std.RegisterLegacyAminoCodec(amino) + txConfig := authtx.NewTxConfig( + c.appCodec, + c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(), + c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(), + authtx.DefaultSignModes, + ) + rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address) + + clientCtx := client.Context{}. + WithLegacyAmino(amino). + WithCodec(c.appCodec). + WithTxConfig(txConfig). + WithNodeURI(c.cfg.AppTomlConfig.Address). + WithClient(rpcClient) + + txService := txServer[T]{ + clientCtx: clientCtx, + txCodec: c.txCodec, + app: c.app, + } + paths := strings.Split(req.Path, "/") + + var resp transaction.Msg + var err error + switch paths[2] { + case "Simulate": + resp, err = handleCometService(ctx, req, txService.Simulate) + case "GetTx": + resp, err = handleCometService(ctx, req, txService.GetTx) + case "BroadcastTx": + resp, err = handleCometService(ctx, req, txService.BroadcastTx) + case "GetTxsEvent": + resp, err = handleCometService(ctx, req, txService.GetTxsEvent) + case "GetBlockWithTxs": + resp, err = handleCometService(ctx, req, txService.GetBlockWithTxs) + case "TxDecode": + resp, err = handleCometService(ctx, req, txService.TxDecode) + case "TxEncode": + resp, err = handleCometService(ctx, req, txService.TxEncode) + case "TxEncodeAmino": + resp, err = handleCometService(ctx, req, txService.TxEncodeAmino) + case "TxDecodeAmino": + resp, err = handleCometService(ctx, req, txService.Simulate) + } + + if err != nil { + return nil, true, err + } + + res, err := queryResponse(resp, req.Height) + return res, true, err + } + // special case for simulation as it is an external gRPC registered on the grpc server component // and not on the app itself, so it won't pass the router afterwards. if req.Path == "/cosmos.tx.v1beta1.Service/Simulate" { From 4bccdfd4678762cd97186b8794715e1f03b996a9 Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 14:44:19 +0700 Subject: [PATCH 05/13] refactor --- server/v2/cometbft/abci.go | 50 ++++---------------------------------- 1 file changed, 5 insertions(+), 45 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index be73d5867fa2..7da9c7957683 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -256,6 +256,9 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq handlerFullName = string(md.Input().FullName()) } + // special case for non-module services as they are external gRPC registered on the grpc server component + // and not on the app itself, so it won't pass the router afterwards. + // Handle comet service if strings.Contains(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) @@ -335,6 +338,7 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq clientCtx: clientCtx, txCodec: c.txCodec, app: c.app, + consensus: c, } paths := strings.Split(req.Path, "/") @@ -346,7 +350,7 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq case "GetTx": resp, err = handleCometService(ctx, req, txService.GetTx) case "BroadcastTx": - resp, err = handleCometService(ctx, req, txService.BroadcastTx) + return nil, true, errors.New("can't route a broadcast tx message") case "GetTxsEvent": resp, err = handleCometService(ctx, req, txService.GetTxsEvent) case "GetBlockWithTxs": @@ -369,50 +373,6 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq return res, true, err } - // special case for simulation as it is an external gRPC registered on the grpc server component - // and not on the app itself, so it won't pass the router afterwards. - if req.Path == "/cosmos.tx.v1beta1.Service/Simulate" { - simulateRequest := &txtypes.SimulateRequest{} - err = gogoproto.Unmarshal(req.Data, simulateRequest) - if err != nil { - return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) - } - - tx, err := c.txCodec.Decode(simulateRequest.TxBytes) - if err != nil { - return nil, true, fmt.Errorf("failed to decode tx: %w", err) - } - - txResult, _, err := c.app.Simulate(ctx, tx) - if err != nil { - return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err) - } - - msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp)) - // pack the messages into Any - for _, msg := range txResult.Resp { - anyMsg, err := codectypes.NewAnyWithValue(msg) - if err != nil { - return nil, true, fmt.Errorf("failed to pack message response: %w", err) - } - - msgResponses = append(msgResponses, anyMsg) - } - - resp := &txtypes.SimulateResponse{ - GasInfo: &sdk.GasInfo{ - GasUsed: txResult.GasUsed, - GasWanted: txResult.GasWanted, - }, - Result: &sdk.Result{ - MsgResponses: msgResponses, - }, - } - - res, err := queryResponse(resp, req.Height) - return res, true, err - } - handler, found := c.queryHandlersMap[handlerFullName] if !found { return nil, true, fmt.Errorf("no query handler found for %s", req.Path) From dfb8e648b1c57b0625debc4a421bc8eba01a2921 Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 14:44:37 +0700 Subject: [PATCH 06/13] implement missing func --- server/v2/cometbft/grpc.go | 145 +++++++++++++++++++++++++++++++++++-- 1 file changed, 138 insertions(+), 7 deletions(-) diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 170475246d35..e0326ba911f0 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -17,6 +17,7 @@ import ( "cosmossdk.io/core/server" corestore "cosmossdk.io/core/store" "cosmossdk.io/core/transaction" + "cosmossdk.io/log" storeserver "cosmossdk.io/server/v2/store" "github.com/cosmos/cosmos-sdk/client" @@ -24,6 +25,8 @@ import ( nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node" codectypes "github.com/cosmos/cosmos-sdk/codec/types" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + "github.com/cosmos/cosmos-sdk/types/query" txtypes "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/x/auth/migrations/legacytx" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" @@ -46,7 +49,7 @@ func gRPCServiceRegistrar[T transaction.Tx]( ) func(srv *grpc.Server) error { return func(srv *grpc.Server) error { cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec)) - txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app}) + txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app, consensus}) nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus}) return nil @@ -57,6 +60,7 @@ type txServer[T transaction.Tx] struct { clientCtx client.Context txCodec transaction.Codec[T] app appSimulator[T] + consensus abci.Application } // BroadcastTx implements tx.ServiceServer. @@ -65,8 +69,84 @@ func (t txServer[T]) BroadcastTx(ctx context.Context, req *txtypes.BroadcastTxRe } // GetBlockWithTxs implements tx.ServiceServer. -func (t txServer[T]) GetBlockWithTxs(context.Context, *txtypes.GetBlockWithTxsRequest) (*txtypes.GetBlockWithTxsResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockWithTxsRequest) (*txtypes.GetBlockWithTxsResponse, error) { + logger := log.NewNopLogger() + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request cannot be nil") + } + + resp, err := t.consensus.Info(ctx, &abci.InfoRequest{}) + if err != nil { + return nil, err + } + currentHeight := resp.LastBlockHeight + + if req.Height < 1 || req.Height > currentHeight { + return nil, sdkerrors.ErrInvalidHeight.Wrapf("requested height %d but height must not be less than 1 "+ + "or greater than the current height %d", req.Height, currentHeight) + } + + node, err := t.clientCtx.GetNode() + if err != nil { + return nil, err + } + + blockID, block, err := cmtservice.GetProtoBlock(ctx, node, &req.Height) + if err != nil { + return nil, err + } + + var offset, limit uint64 + if req.Pagination != nil { + offset = req.Pagination.Offset + limit = req.Pagination.Limit + } else { + offset = 0 + limit = query.DefaultLimit + } + + blockTxs := block.Data.Txs + blockTxsLn := uint64(len(blockTxs)) + txs := make([]*txtypes.Tx, 0, limit) + if offset >= blockTxsLn && blockTxsLn != 0 { + return nil, sdkerrors.ErrInvalidRequest.Wrapf("out of range: cannot paginate %d txs with offset %d and limit %d", blockTxsLn, offset, limit) + } + decodeTxAt := func(i uint64) error { + tx := blockTxs[i] + txb, err := t.clientCtx.TxConfig.TxDecoder()(tx) + fmt.Println("TxDecoder", txb, err) + if err != nil { + return err + } + p, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() + if err != nil { + return err + } + txs = append(txs, p) + return nil + } + if req.Pagination != nil && req.Pagination.Reverse { + for i, count := offset, uint64(0); i > 0 && count != limit; i, count = i-1, count+1 { + if err = decodeTxAt(i); err != nil { + logger.Error("failed to decode tx", "error", err) + } + } + } else { + for i, count := offset, uint64(0); i < blockTxsLn && count != limit; i, count = i+1, count+1 { + if err = decodeTxAt(i); err != nil { + logger.Error("failed to decode tx", "error", err) + } + } + } + + return &txtypes.GetBlockWithTxsResponse{ + Txs: txs, + BlockId: &blockID, + Block: block, + Pagination: &query.PageResponse{ + Total: blockTxsLn, + }, + }, nil } // GetTx implements tx.ServiceServer. @@ -100,8 +180,33 @@ func (t txServer[T]) GetTx(ctx context.Context, req *txtypes.GetTxRequest) (*txt } // GetTxsEvent implements tx.ServiceServer. -func (t txServer[T]) GetTxsEvent(context.Context, *txtypes.GetTxsEventRequest) (*txtypes.GetTxsEventResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (t txServer[T]) GetTxsEvent(ctx context.Context, req *txtypes.GetTxsEventRequest) (*txtypes.GetTxsEventResponse, error) { + if req == nil { + return nil, status.Error(codes.InvalidArgument, "request cannot be nil") + } + + orderBy := parseOrderBy(req.OrderBy) + + result, err := authtx.QueryTxsByEvents(t.clientCtx, int(req.Page), int(req.Limit), req.Query, orderBy) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + txsList := make([]*txtypes.Tx, len(result.Txs)) + for i, tx := range result.Txs { + protoTx, ok := tx.Tx.GetCachedValue().(*txtypes.Tx) + if !ok { + return nil, status.Errorf(codes.Internal, "getting cached value failed expected %T, got %T", txtypes.Tx{}, tx.Tx.GetCachedValue()) + } + + txsList[i] = protoTx + } + + return &txtypes.GetTxsEventResponse{ + Txs: txsList, + TxResponses: result.Txs, + Total: result.TotalCount, + }, nil } // Simulate implements tx.ServiceServer. @@ -159,8 +264,23 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest) } // TxDecode implements tx.ServiceServer. -func (t txServer[T]) TxDecode(context.Context, *txtypes.TxDecodeRequest) (*txtypes.TxDecodeResponse, error) { - return nil, status.Error(codes.Unimplemented, "not implemented") +func (t txServer[T]) TxDecode(ctx context.Context, req *txtypes.TxDecodeRequest) (*txtypes.TxDecodeResponse, error) { + if req.TxBytes == nil { + return nil, status.Error(codes.InvalidArgument, "invalid empty tx bytes") + } + + txb, err := t.clientCtx.TxConfig.TxDecoder()(req.TxBytes) + if err != nil { + return nil, err + } + + tx, err := txb.(interface{ AsTx() (*txtypes.Tx, error) }).AsTx() // TODO: maybe we can break the Tx interface to add this also + if err != nil { + return nil, err + } + return &txtypes.TxDecodeResponse{ + Tx: tx, + }, nil } // TxDecodeAmino implements tx.ServiceServer. @@ -325,3 +445,14 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{ }, }, } + +func parseOrderBy(orderBy txtypes.OrderBy) string { + switch orderBy { + case txtypes.OrderBy_ORDER_BY_ASC: + return "asc" + case txtypes.OrderBy_ORDER_BY_DESC: + return "desc" + default: + return "" // Defaults to CometBFT's default, which is `asc` now. + } +} From 91ee6409180268dd6a7767ea928b37a4da204a2a Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 14:47:09 +0700 Subject: [PATCH 07/13] feedback --- server/v2/cometbft/abci.go | 3 --- server/v2/cometbft/server.go | 2 +- simapp/v2/simdv2/cmd/commands.go | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 7da9c7957683..3a57e7d9c279 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -41,10 +41,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/codec" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/std" - sdk "github.com/cosmos/cosmos-sdk/types" - txtypes "github.com/cosmos/cosmos-sdk/types/tx" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" ) diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 397e74dbb775..9e4649f4f891 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -67,13 +67,13 @@ type CometBFTServer[T transaction.Tx] struct { } func New[T transaction.Tx]( - consensusAddressCodec addresscodec.Codec, logger log.Logger, appName string, store types.Store, app appmanager.AppManager[T], appCodec codec.Codec, txCodec transaction.Codec[T], + consensusAddressCodec addresscodec.Codec, queryHandlers map[string]appmodulev2.Handler, decoderResolver decoding.DecoderResolver, serverOptions ServerOptions[T], diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index 056c2609061a..4b8cb10ab0cd 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -111,13 +111,13 @@ func InitRootCmd[T transaction.Tx]( // consensus component if deps.ConsensusServer == nil { deps.ConsensusServer, err = cometbft.New( - deps.ClientContext.ConsensusAddressCodec, logger, simApp.Name(), simApp.Store(), simApp.App.AppManager, simApp.AppCodec(), &client.DefaultTxDecoder[T]{TxConfig: deps.TxConfig}, + deps.ClientContext.ConsensusAddressCodec, simApp.App.QueryHandlers(), simApp.App.SchemaDecoderResolver(), initCometOptions[T](), From b99ec544d51c7f55d4415c21ca3634a6895297c4 Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 16:19:43 +0700 Subject: [PATCH 08/13] feedback --- server/v2/cometbft/abci.go | 72 +++++++++++++++----------------------- server/v2/cometbft/grpc.go | 25 +++++++++++++ 2 files changed, 54 insertions(+), 43 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 27a9cd15501f..29e12c5f9679 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -191,12 +191,13 @@ func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( } // when a client did not provide a query height, manually inject the latest + // for modules queries, AppManager does it automatically if req.Height == 0 { - lastestVersion, err := c.store.GetLatestVersion() + latestVersion, err := c.store.GetLatestVersion() if err != nil { return nil, err } - req.Height = int64(lastestVersion) + req.Height = int64(latestVersion) } // this error most probably means that we can't handle it with a proto message, so @@ -261,24 +262,27 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, false, fmt.Errorf("invalid request path: %s", req.Path) + } var resp transaction.Msg var err error switch paths[2] { case "GetNodeInfo": - resp, err = handleCometService(ctx, req, cometQServer.GetNodeInfo) + resp, err = handleExternalService(ctx, req, cometQServer.GetNodeInfo) case "GetSyncing": - resp, err = handleCometService(ctx, req, cometQServer.GetSyncing) + resp, err = handleExternalService(ctx, req, cometQServer.GetSyncing) case "GetLatestBlock": - resp, err = handleCometService(ctx, req, cometQServer.GetLatestBlock) + resp, err = handleExternalService(ctx, req, cometQServer.GetLatestBlock) case "GetBlockByHeight": - resp, err = handleCometService(ctx, req, cometQServer.GetBlockByHeight) + resp, err = handleExternalService(ctx, req, cometQServer.GetBlockByHeight) case "GetLatestValidatorSet": - resp, err = handleCometService(ctx, req, cometQServer.GetLatestValidatorSet) + resp, err = handleExternalService(ctx, req, cometQServer.GetLatestValidatorSet) case "GetValidatorSetByHeight": - resp, err = handleCometService(ctx, req, cometQServer.GetValidatorSetByHeight) + resp, err = handleExternalService(ctx, req, cometQServer.GetValidatorSetByHeight) case "ABCIQuery": - resp, err = handleCometService(ctx, req, cometQServer.ABCIQuery) + resp, err = handleExternalService(ctx, req, cometQServer.ABCIQuery) } if err != nil { @@ -293,14 +297,17 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq if strings.Contains(req.Path, "/cosmos.base.node.v1beta1.Service") { nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c} paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, false, fmt.Errorf("invalid request path: %s", req.Path) + } var resp transaction.Msg var err error switch paths[2] { case "Config": - resp, err = handleCometService(ctx, req, nodeQService.Config) + resp, err = handleExternalService(ctx, req, nodeQService.Config) case "Status": - resp, err = handleCometService(ctx, req, nodeQService.Status) + resp, err = handleExternalService(ctx, req, nodeQService.Status) } if err != nil { @@ -338,28 +345,31 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq consensus: c, } paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, false, fmt.Errorf("invalid request path: %s", req.Path) + } var resp transaction.Msg var err error switch paths[2] { case "Simulate": - resp, err = handleCometService(ctx, req, txService.Simulate) + resp, err = handleExternalService(ctx, req, txService.Simulate) case "GetTx": - resp, err = handleCometService(ctx, req, txService.GetTx) + resp, err = handleExternalService(ctx, req, txService.GetTx) case "BroadcastTx": return nil, true, errors.New("can't route a broadcast tx message") case "GetTxsEvent": - resp, err = handleCometService(ctx, req, txService.GetTxsEvent) + resp, err = handleExternalService(ctx, req, txService.GetTxsEvent) case "GetBlockWithTxs": - resp, err = handleCometService(ctx, req, txService.GetBlockWithTxs) + resp, err = handleExternalService(ctx, req, txService.GetBlockWithTxs) case "TxDecode": - resp, err = handleCometService(ctx, req, txService.TxDecode) + resp, err = handleExternalService(ctx, req, txService.TxDecode) case "TxEncode": - resp, err = handleCometService(ctx, req, txService.TxEncode) + resp, err = handleExternalService(ctx, req, txService.TxEncode) case "TxEncodeAmino": - resp, err = handleCometService(ctx, req, txService.TxEncodeAmino) + resp, err = handleExternalService(ctx, req, txService.TxEncodeAmino) case "TxDecodeAmino": - resp, err = handleCometService(ctx, req, txService.Simulate) + resp, err = handleExternalService(ctx, req, txService.Simulate) } if err != nil { @@ -391,30 +401,6 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq return resp, true, err } -func handleCometService[T any, PT interface { - *T - gogoproto.Message -}, - U any, UT interface { - *U - gogoproto.Message - }]( - ctx context.Context, - rawReq *abciproto.QueryRequest, - handler func(ctx context.Context, msg PT) (UT, error), -) (transaction.Msg, error) { - req := PT(new(T)) - err := gogoproto.Unmarshal(rawReq.Data, req) - if err != nil { - return nil, err - } - typedResp, err := handler(ctx, req) - if err != nil { - return nil, err - } - return typedResp, nil -} - // InitChain implements types.Application. func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) { c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId) diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index e0326ba911f0..e5e98301e42c 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -30,6 +30,7 @@ import ( txtypes "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/x/auth/migrations/legacytx" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" + gogoproto "github.com/cosmos/gogoproto/proto" ) type appSimulator[T transaction.Tx] interface { @@ -456,3 +457,27 @@ func parseOrderBy(orderBy txtypes.OrderBy) string { return "" // Defaults to CometBFT's default, which is `asc` now. } } + +func handleExternalService[T any, PT interface { + *T + gogoproto.Message +}, + U any, UT interface { + *U + gogoproto.Message + }]( + ctx context.Context, + rawReq *abciproto.QueryRequest, + handler func(ctx context.Context, msg PT) (UT, error), +) (transaction.Msg, error) { + req := PT(new(T)) + err := gogoproto.Unmarshal(rawReq.Data, req) + if err != nil { + return nil, err + } + typedResp, err := handler(ctx, req) + if err != nil { + return nil, err + } + return typedResp, nil +} From 0314c4fc5433c8052c48745cde1fdc1dd625d954 Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 16:27:25 +0700 Subject: [PATCH 09/13] lint --- server/v2/cometbft/abci.go | 9 ++++----- server/v2/cometbft/grpc.go | 9 ++++----- server/v2/cometbft/server.go | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 29e12c5f9679..89c2ab706391 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -10,17 +10,20 @@ import ( abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" gogoproto "github.com/cosmos/gogoproto/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" "cosmossdk.io/collections" + addresscodec "cosmossdk.io/core/address" appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/comet" corecontext "cosmossdk.io/core/context" "cosmossdk.io/core/event" "cosmossdk.io/core/server" "cosmossdk.io/core/store" + "cosmossdk.io/core/transaction" errorsmod "cosmossdk.io/errors/v2" "cosmossdk.io/log" "cosmossdk.io/schema/appdata" @@ -34,10 +37,6 @@ import ( "cosmossdk.io/store/v2/snapshots" consensustypes "cosmossdk.io/x/consensus/types" - addresscodec "cosmossdk.io/core/address" - coreserver "cosmossdk.io/core/server" - "cosmossdk.io/core/transaction" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/codec" @@ -93,7 +92,7 @@ type consensus[T transaction.Tx] struct { queryHandlersMap map[string]appmodulev2.Handler getProtoRegistry func() (*protoregistry.Files, error) consensusAddressCodec addresscodec.Codec - cfgMap coreserver.ConfigMap + cfgMap server.ConfigMap } // CheckTx implements types.Application. diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index e5e98301e42c..8c65318cb252 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -30,7 +30,6 @@ import ( txtypes "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/x/auth/migrations/legacytx" authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" - gogoproto "github.com/cosmos/gogoproto/proto" ) type appSimulator[T transaction.Tx] interface { @@ -75,7 +74,7 @@ func (t txServer[T]) GetBlockWithTxs(ctx context.Context, req *txtypes.GetBlockW if req == nil { return nil, status.Error(codes.InvalidArgument, "request cannot be nil") } - + resp, err := t.consensus.Info(ctx, &abci.InfoRequest{}) if err != nil { return nil, err @@ -460,18 +459,18 @@ func parseOrderBy(orderBy txtypes.OrderBy) string { func handleExternalService[T any, PT interface { *T - gogoproto.Message + proto.Message }, U any, UT interface { *U - gogoproto.Message + proto.Message }]( ctx context.Context, rawReq *abciproto.QueryRequest, handler func(ctx context.Context, msg PT) (UT, error), ) (transaction.Msg, error) { req := PT(new(T)) - err := gogoproto.Unmarshal(rawReq.Data, req) + err := proto.Unmarshal(rawReq.Data, req) if err != nil { return nil, err } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 9e4649f4f891..b424b163e4f5 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/pflag" "google.golang.org/grpc" + addresscodec "cosmossdk.io/core/address" appmodulev2 "cosmossdk.io/core/appmodule/v2" "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" @@ -38,7 +39,6 @@ import ( "cosmossdk.io/server/v2/cometbft/types" "cosmossdk.io/store/v2/snapshots" - addresscodec "cosmossdk.io/core/address" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" genutiltypes "github.com/cosmos/cosmos-sdk/x/genutil/types" From 278126db196935aef4aa07510b2db00b29bec78b Mon Sep 17 00:00:00 2001 From: Hieu Vu Date: Tue, 10 Dec 2024 17:32:29 +0700 Subject: [PATCH 10/13] refactor --- server/v2/cometbft/abci.go | 132 ++----------------------------------- server/v2/cometbft/grpc.go | 116 ++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 126 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 89c2ab706391..0114d9335676 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -10,7 +10,6 @@ import ( abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" gogoproto "github.com/cosmos/gogoproto/proto" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" @@ -37,11 +36,7 @@ import ( "cosmossdk.io/store/v2/snapshots" consensustypes "cosmossdk.io/x/consensus/types" - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" "github.com/cosmos/cosmos-sdk/codec" - "github.com/cosmos/cosmos-sdk/std" - authtx "github.com/cosmos/cosmos-sdk/x/auth/tx" ) const ( @@ -256,127 +251,12 @@ func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq // special case for non-module services as they are external gRPC registered on the grpc server component // and not on the app itself, so it won't pass the router afterwards. - // Handle comet service - if strings.Contains(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { - rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) - cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) - paths := strings.Split(req.Path, "/") - if len(paths) <= 2 { - return nil, false, fmt.Errorf("invalid request path: %s", req.Path) - } - - var resp transaction.Msg - var err error - switch paths[2] { - case "GetNodeInfo": - resp, err = handleExternalService(ctx, req, cometQServer.GetNodeInfo) - case "GetSyncing": - resp, err = handleExternalService(ctx, req, cometQServer.GetSyncing) - case "GetLatestBlock": - resp, err = handleExternalService(ctx, req, cometQServer.GetLatestBlock) - case "GetBlockByHeight": - resp, err = handleExternalService(ctx, req, cometQServer.GetBlockByHeight) - case "GetLatestValidatorSet": - resp, err = handleExternalService(ctx, req, cometQServer.GetLatestValidatorSet) - case "GetValidatorSetByHeight": - resp, err = handleExternalService(ctx, req, cometQServer.GetValidatorSetByHeight) - case "ABCIQuery": - resp, err = handleExternalService(ctx, req, cometQServer.ABCIQuery) - } - - if err != nil { - return nil, true, err - } - - res, err := queryResponse(resp, req.Height) - return res, true, err - } - - // Handle node service - if strings.Contains(req.Path, "/cosmos.base.node.v1beta1.Service") { - nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c} - paths := strings.Split(req.Path, "/") - if len(paths) <= 2 { - return nil, false, fmt.Errorf("invalid request path: %s", req.Path) - } - - var resp transaction.Msg - var err error - switch paths[2] { - case "Config": - resp, err = handleExternalService(ctx, req, nodeQService.Config) - case "Status": - resp, err = handleExternalService(ctx, req, nodeQService.Status) - } - - if err != nil { - return nil, true, err - } - - res, err := queryResponse(resp, req.Height) - return res, true, err - } - - // Handle tx service - if strings.Contains(req.Path, "/cosmos.tx.v1beta1.Service") { - // init simple client context - amino := codec.NewLegacyAmino() - std.RegisterLegacyAminoCodec(amino) - txConfig := authtx.NewTxConfig( - c.appCodec, - c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(), - c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(), - authtx.DefaultSignModes, - ) - rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address) - - clientCtx := client.Context{}. - WithLegacyAmino(amino). - WithCodec(c.appCodec). - WithTxConfig(txConfig). - WithNodeURI(c.cfg.AppTomlConfig.Address). - WithClient(rpcClient) - - txService := txServer[T]{ - clientCtx: clientCtx, - txCodec: c.txCodec, - app: c.app, - consensus: c, - } - paths := strings.Split(req.Path, "/") - if len(paths) <= 2 { - return nil, false, fmt.Errorf("invalid request path: %s", req.Path) - } - - var resp transaction.Msg - var err error - switch paths[2] { - case "Simulate": - resp, err = handleExternalService(ctx, req, txService.Simulate) - case "GetTx": - resp, err = handleExternalService(ctx, req, txService.GetTx) - case "BroadcastTx": - return nil, true, errors.New("can't route a broadcast tx message") - case "GetTxsEvent": - resp, err = handleExternalService(ctx, req, txService.GetTxsEvent) - case "GetBlockWithTxs": - resp, err = handleExternalService(ctx, req, txService.GetBlockWithTxs) - case "TxDecode": - resp, err = handleExternalService(ctx, req, txService.TxDecode) - case "TxEncode": - resp, err = handleExternalService(ctx, req, txService.TxEncode) - case "TxEncodeAmino": - resp, err = handleExternalService(ctx, req, txService.TxEncodeAmino) - case "TxDecodeAmino": - resp, err = handleExternalService(ctx, req, txService.Simulate) - } - - if err != nil { - return nil, true, err - } - - res, err := queryResponse(resp, req.Height) - return res, true, err + externalResp, err := c.maybeHandleExternalServices(ctx, req) + if err != nil { + return nil, true, err + } else if externalResp != nil { + resp, err = queryResponse(externalResp, req.Height) + return resp, true, err } handler, found := c.queryHandlersMap[handlerFullName] diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 8c65318cb252..0fd40aae217a 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -2,6 +2,7 @@ package cometbft import ( "context" + "errors" "fmt" "strings" @@ -20,10 +21,13 @@ import ( "cosmossdk.io/log" storeserver "cosmossdk.io/server/v2/store" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node" + "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/std" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/types/query" @@ -457,6 +461,118 @@ func parseOrderBy(orderBy txtypes.OrderBy) string { } } +func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abci.QueryRequest) (transaction.Msg, error) { + // Handle comet service + if strings.Contains(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { + rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) + cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) + paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, fmt.Errorf("invalid request path: %s", req.Path) + } + + var resp transaction.Msg + var err error + switch paths[2] { + case "GetNodeInfo": + resp, err = handleExternalService(ctx, req, cometQServer.GetNodeInfo) + case "GetSyncing": + resp, err = handleExternalService(ctx, req, cometQServer.GetSyncing) + case "GetLatestBlock": + resp, err = handleExternalService(ctx, req, cometQServer.GetLatestBlock) + case "GetBlockByHeight": + resp, err = handleExternalService(ctx, req, cometQServer.GetBlockByHeight) + case "GetLatestValidatorSet": + resp, err = handleExternalService(ctx, req, cometQServer.GetLatestValidatorSet) + case "GetValidatorSetByHeight": + resp, err = handleExternalService(ctx, req, cometQServer.GetValidatorSetByHeight) + case "ABCIQuery": + resp, err = handleExternalService(ctx, req, cometQServer.ABCIQuery) + } + + return resp, err + } + + // Handle node service + if strings.Contains(req.Path, "/cosmos.base.node.v1beta1.Service") { + nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c} + paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, fmt.Errorf("invalid request path: %s", req.Path) + } + + var resp transaction.Msg + var err error + switch paths[2] { + case "Config": + resp, err = handleExternalService(ctx, req, nodeQService.Config) + case "Status": + resp, err = handleExternalService(ctx, req, nodeQService.Status) + } + + return resp, err + } + + // Handle tx service + if strings.Contains(req.Path, "/cosmos.tx.v1beta1.Service") { + // init simple client context + amino := codec.NewLegacyAmino() + std.RegisterLegacyAminoCodec(amino) + txConfig := authtx.NewTxConfig( + c.appCodec, + c.appCodec.InterfaceRegistry().SigningContext().AddressCodec(), + c.appCodec.InterfaceRegistry().SigningContext().ValidatorAddressCodec(), + authtx.DefaultSignModes, + ) + rpcClient, _ := client.NewClientFromNode(c.cfg.AppTomlConfig.Address) + + clientCtx := client.Context{}. + WithLegacyAmino(amino). + WithCodec(c.appCodec). + WithTxConfig(txConfig). + WithNodeURI(c.cfg.AppTomlConfig.Address). + WithClient(rpcClient) + + txService := txServer[T]{ + clientCtx: clientCtx, + txCodec: c.txCodec, + app: c.app, + consensus: c, + } + paths := strings.Split(req.Path, "/") + if len(paths) <= 2 { + return nil, fmt.Errorf("invalid request path: %s", req.Path) + } + + var resp transaction.Msg + var err error + switch paths[2] { + case "Simulate": + resp, err = handleExternalService(ctx, req, txService.Simulate) + case "GetTx": + resp, err = handleExternalService(ctx, req, txService.GetTx) + case "BroadcastTx": + return nil, errors.New("can't route a broadcast tx message") + case "GetTxsEvent": + resp, err = handleExternalService(ctx, req, txService.GetTxsEvent) + case "GetBlockWithTxs": + resp, err = handleExternalService(ctx, req, txService.GetBlockWithTxs) + case "TxDecode": + resp, err = handleExternalService(ctx, req, txService.TxDecode) + case "TxEncode": + resp, err = handleExternalService(ctx, req, txService.TxEncode) + case "TxEncodeAmino": + resp, err = handleExternalService(ctx, req, txService.TxEncodeAmino) + case "TxDecodeAmino": + resp, err = handleExternalService(ctx, req, txService.Simulate) + } + + return resp, err + } + + return nil, nil +} + func handleExternalService[T any, PT interface { *T proto.Message From b0260cce5f562dd9b0b12aade3a3d7e079fcf131 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 11 Dec 2024 16:29:46 +0100 Subject: [PATCH 11/13] fixes --- server/v2/cometbft/grpc.go | 7 ++++--- server/v2/cometbft/server.go | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 0fd40aae217a..0a623495f3ad 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -463,8 +463,9 @@ func parseOrderBy(orderBy txtypes.OrderBy) string { func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abci.QueryRequest) (transaction.Msg, error) { // Handle comet service - if strings.Contains(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { + if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) + cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) paths := strings.Split(req.Path, "/") if len(paths) <= 2 { @@ -494,7 +495,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc } // Handle node service - if strings.Contains(req.Path, "/cosmos.base.node.v1beta1.Service") { + if strings.HasPrefix(req.Path, "/cosmos.base.node.v1beta1.Service") { nodeQService := nodeServer[T]{c.cfgMap, c.cfg.AppTomlConfig, c} paths := strings.Split(req.Path, "/") if len(paths) <= 2 { @@ -514,7 +515,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc } // Handle tx service - if strings.Contains(req.Path, "/cosmos.tx.v1beta1.Service") { + if strings.HasPrefix(req.Path, "/cosmos.tx.v1beta1.Service") { // init simple client context amino := codec.NewLegacyAmino() std.RegisterLegacyAminoCodec(amino) diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index b424b163e4f5..8789b7830f94 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -192,6 +192,7 @@ func New[T transaction.Tx]( addrPeerFilter: srv.serverOptions.AddrPeerFilter, idPeerFilter: srv.serverOptions.IdPeerFilter, cfgMap: cfg, + consensusAddressCodec: consensusAddressCodec, } c.optimisticExec = oe.NewOptimisticExecution( From 426ac81b8d594956030ef07978e68366aa662500 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 11 Dec 2024 16:34:50 +0100 Subject: [PATCH 12/13] fix --- server/v2/cometbft/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 0a623495f3ad..7f413a819948 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -464,7 +464,7 @@ func parseOrderBy(orderBy txtypes.OrderBy) string { func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abci.QueryRequest) (transaction.Msg, error) { // Handle comet service if strings.HasPrefix(req.Path, "/cosmos.base.tendermint.v1beta1.Service") { - rpcClient, _ := rpchttp.New(c.cfg.AppTomlConfig.Address) + rpcClient, _ := rpchttp.New(c.cfg.ConfigTomlConfig.RPC.ListenAddress) cometQServer := cmtservice.NewQueryServer(rpcClient, c.Query, c.consensusAddressCodec) paths := strings.Split(req.Path, "/") From 1b606ca5f3a16faccb963520f4e424fe23c1dcdc Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 11 Dec 2024 16:36:26 +0100 Subject: [PATCH 13/13] typo --- server/v2/cometbft/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/v2/cometbft/grpc.go b/server/v2/cometbft/grpc.go index 7f413a819948..cfc682e47f31 100644 --- a/server/v2/cometbft/grpc.go +++ b/server/v2/cometbft/grpc.go @@ -565,7 +565,7 @@ func (c *consensus[T]) maybeHandleExternalServices(ctx context.Context, req *abc case "TxEncodeAmino": resp, err = handleExternalService(ctx, req, txService.TxEncodeAmino) case "TxDecodeAmino": - resp, err = handleExternalService(ctx, req, txService.Simulate) + resp, err = handleExternalService(ctx, req, txService.TxDecodeAmino) } return resp, err