From c4463961467054e0e252af27356a334c56dc9e18 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Mon, 10 Oct 2022 18:11:24 -0500 Subject: [PATCH] initial grpc concurrency move (#344) (cherry picked from commit d2f1cb498c2042fa2d10f10905463d5abbe4a3d1) --- client/cmd.go | 5 ----- client/config/cmd.go | 6 ------ client/config/config.go | 28 +++++++++++----------------- client/config/toml.go | 4 ---- client/flags/flags.go | 1 - client/grpc_query_test.go | 33 ++++++++++++--------------------- server/config/config.go | 6 ++++++ server/config/toml.go | 5 +++++ server/start.go | 7 +++++-- types/module/module_test.go | 2 +- 10 files changed, 40 insertions(+), 57 deletions(-) diff --git a/client/cmd.go b/client/cmd.go index 667f2d00c442..0e401a9250f6 100644 --- a/client/cmd.go +++ b/client/cmd.go @@ -103,11 +103,6 @@ func ReadPersistentCommandFlags(clientCtx Context, flagSet *pflag.FlagSet) (Cont clientCtx = clientCtx.WithSimulation(dryRun) } - if !clientCtx.GRPCConcurrency || flagSet.Changed(flags.FlagGRPCConcurrency) { - grpcConcurrency, _ := flagSet.GetBool(flags.FlagGRPCConcurrency) - clientCtx = clientCtx.WithConcurrency(grpcConcurrency) - } - if clientCtx.KeyringDir == "" || flagSet.Changed(flags.FlagKeyringDir) { keyringDir, _ := flagSet.GetString(flags.FlagKeyringDir) diff --git a/client/config/cmd.go b/client/config/cmd.go index 55642386e22c..1da8afdaef8c 100644 --- a/client/config/cmd.go +++ b/client/config/cmd.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "path/filepath" - "strconv" tmcli "github.com/tendermint/tendermint/libs/cli" @@ -59,8 +58,6 @@ func runConfigCmd(cmd *cobra.Command, args []string) error { cmd.Println(conf.Node) case flags.FlagBroadcastMode: cmd.Println(conf.BroadcastMode) - case flags.FlagGRPCConcurrency: - cmd.Println(conf.GRPCConcurrency) default: err := errUnknownConfigKey(key) return fmt.Errorf("couldn't get the value for the key: %v, error: %v", key, err) @@ -81,9 +78,6 @@ func runConfigCmd(cmd *cobra.Command, args []string) error { conf.SetNode(value) case flags.FlagBroadcastMode: conf.SetBroadcastMode(value) - case flags.FlagGRPCConcurrency: - valuebool, _ := strconv.ParseBool(value) - conf.SetGRPCConcurrency(valuebool) default: return errUnknownConfigKey(key) } diff --git a/client/config/config.go b/client/config/config.go index 5a585ec9c4cb..6297cf2c26b1 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -10,26 +10,24 @@ import ( // Default constants const ( - chainID = "" - keyringBackend = "os" - output = "text" - node = "tcp://localhost:26657" - broadcastMode = "sync" - grpcConcurrency = false + chainID = "" + keyringBackend = "os" + output = "text" + node = "tcp://localhost:26657" + broadcastMode = "sync" ) type ClientConfig struct { - ChainID string `mapstructure:"chain-id" json:"chain-id"` - KeyringBackend string `mapstructure:"keyring-backend" json:"keyring-backend"` - Output string `mapstructure:"output" json:"output"` - Node string `mapstructure:"node" json:"node"` - BroadcastMode string `mapstructure:"broadcast-mode" json:"broadcast-mode"` - GRPCConcurrency bool `mapstructure:"grpc-concurrency" json:"grpc-concurrency"` + ChainID string `mapstructure:"chain-id" json:"chain-id"` + KeyringBackend string `mapstructure:"keyring-backend" json:"keyring-backend"` + Output string `mapstructure:"output" json:"output"` + Node string `mapstructure:"node" json:"node"` + BroadcastMode string `mapstructure:"broadcast-mode" json:"broadcast-mode"` } // defaultClientConfig returns the reference to ClientConfig with default values. func defaultClientConfig() *ClientConfig { - return &ClientConfig{chainID, keyringBackend, output, node, broadcastMode, grpcConcurrency} + return &ClientConfig{chainID, keyringBackend, output, node, broadcastMode} } func (c *ClientConfig) SetChainID(chainID string) { @@ -52,10 +50,6 @@ func (c *ClientConfig) SetBroadcastMode(broadcastMode string) { c.BroadcastMode = broadcastMode } -func (c *ClientConfig) SetGRPCConcurrency(grpcConcurrency bool) { - c.GRPCConcurrency = grpcConcurrency -} - // ReadFromClientConfig reads values from client.toml file and updates them in client Context func ReadFromClientConfig(ctx client.Context) (client.Context, error) { configPath := filepath.Join(ctx.HomeDir, "config") diff --git a/client/config/toml.go b/client/config/toml.go index 109e5bcb2993..87b3b04025bb 100644 --- a/client/config/toml.go +++ b/client/config/toml.go @@ -26,10 +26,6 @@ output = "{{ .Output }}" node = "{{ .Node }}" # Transaction broadcasting mode (sync|async|block) broadcast-mode = "{{ .BroadcastMode }}" -# Concurrency defines if node queries should be done in parallel. -# This is experimental and has led to node failures, so enable with caution. -# The default value is false. -grpc-concurrency = {{ .GRPCConcurrency }} ` // writeConfigToFile parses defaultConfigTemplate, renders config using the template and writes it to diff --git a/client/flags/flags.go b/client/flags/flags.go index 1f985ce6719d..18ad7f99f336 100644 --- a/client/flags/flags.go +++ b/client/flags/flags.go @@ -72,7 +72,6 @@ const ( FlagKeyAlgorithm = "algo" FlagFeeAccount = "fee-account" FlagReverse = "reverse" - FlagGRPCConcurrency = "grpc-concurrency" // Tendermint logging flags FlagLogLevel = "log_level" diff --git a/client/grpc_query_test.go b/client/grpc_query_test.go index 0f9f40ab8a0c..8b1e3f905195 100644 --- a/client/grpc_query_test.go +++ b/client/grpc_query_test.go @@ -25,9 +25,10 @@ type IntegrationTestSuite struct { } type testcase struct { - clientContextHeight int64 - grpcHeight int64 - expectedHeight int64 + clientContextHeight int64 + grpcHeight int64 + expectedHeight int64 + grpcConcurrentEnabled bool } const ( @@ -66,22 +67,6 @@ func (s *IntegrationTestSuite) TestGRPCQuery_TestService() { s.Require().Equal("hello", testRes.Message) } -func (s *IntegrationTestSuite) TestGRPCConcurrency() { - val0 := s.network.Validators[0] - clientCtx := val0.ClientCtx - clientCtx.GRPCConcurrency = true - in := &testdata.EchoRequest{Message: "hello"} - out := &testdata.EchoResponse{} - err := clientCtx.Invoke(context.Background(), "/testdata.Query/Echo", in, out) - s.Require().NoError(err) - s.Require().Equal("hello", out.Message) - - clientCtx.GRPCConcurrency = false - err = clientCtx.Invoke(context.Background(), "/testdata.Query/Echo", in, out) - s.Require().NoError(err) - s.Require().Equal("hello", out.Message) -} - func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() { val0 := s.network.Validators[0] @@ -106,7 +91,13 @@ func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() { "clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": { clientContextHeight: 1, grpcHeight: 0, // chosen - expectedHeight: 3, // latest height + expectedHeight: 1, // context height + }, + "clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen, grpcConcurrency on": { + clientContextHeight: 1, + grpcHeight: 0, // chosen + expectedHeight: 3, // latest height + grpcConcurrentEnabled: true, }, "clientContextHeight 3; grpcHeight is 3 - 3 is returned": { clientContextHeight: 3, @@ -124,7 +115,7 @@ func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() { s.T().Run(name, func(t *testing.T) { // Setup clientCtx := val0.ClientCtx - clientCtx.GRPCConcurrency = true + clientCtx.GRPCConcurrency = tc.grpcConcurrentEnabled clientCtx.Height = 0 if tc.clientContextHeight != heightNotSetFlag { diff --git a/server/config/config.go b/server/config/config.go index 34e6e61365f7..0a4850a64ab0 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -150,6 +150,10 @@ type GRPCConfig struct { // MaxSendMsgSize defines the max message size in bytes the server can send. // The default value is math.MaxInt32. MaxSendMsgSize int `mapstructure:"max-send-msg-size"` + + // Concurrency defines if node queries should be done in parallel. + // The default value is false + Concurrency bool `mapstructure:"concurrency"` } // GRPCWebConfig defines configuration for the gRPC-web server. @@ -245,6 +249,7 @@ func DefaultConfig() *Config { Address: DefaultGRPCAddress, MaxRecvMsgSize: DefaultGRPCMaxRecvMsgSize, MaxSendMsgSize: DefaultGRPCMaxSendMsgSize, + Concurrency: false, }, Rosetta: RosettaConfig{ Enable: false, @@ -321,6 +326,7 @@ func GetConfig(v *viper.Viper) Config { Address: v.GetString("grpc.address"), MaxRecvMsgSize: v.GetInt("grpc.max-recv-msg-size"), MaxSendMsgSize: v.GetInt("grpc.max-send-msg-size"), + Concurrency: v.GetBool("grpc.concurrency"), }, GRPCWeb: GRPCWebConfig{ Enable: v.GetBool("grpc-web.enable"), diff --git a/server/config/toml.go b/server/config/toml.go index 97760a912e77..493559d44c24 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -184,6 +184,11 @@ max-recv-msg-size = "{{ .GRPC.MaxRecvMsgSize }}" # The default value is math.MaxInt32. max-send-msg-size = "{{ .GRPC.MaxSendMsgSize }}" +# Concurrency defines if node queries should be done in parallel. +# This is experimental and has led to node failures, so enable with caution. +# The default value is false. +concurrency = {{ .GRPC.Concurrency }} + ############################################################################### ### gRPC Web Configuration ### ############################################################################### diff --git a/server/start.go b/server/start.go index 2c918edebd74..e3b7119f5b5e 100644 --- a/server/start.go +++ b/server/start.go @@ -28,7 +28,6 @@ import ( "github.com/cosmos/cosmos-sdk/codec" pruningtypes "github.com/cosmos/cosmos-sdk/pruning/types" "github.com/cosmos/cosmos-sdk/server/api" - "github.com/cosmos/cosmos-sdk/server/config" serverconfig "github.com/cosmos/cosmos-sdk/server/config" servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/server/rosetta" @@ -244,7 +243,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } - config := config.GetConfig(ctx.Viper) + config := serverconfig.GetConfig(ctx.Viper) if err := config.ValidateBasic(); err != nil { ctx.Logger.Error("WARNING: The minimum-gas-prices config in app.toml is set to the empty string. " + "This defaults to 0 in the current version, but will error in the next version " + @@ -289,6 +288,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App app.RegisterTendermintService(clientCtx) } + if config.GRPC.Concurrency { + clientCtx = clientCtx.WithConcurrency(true) + } + var apiSrv *api.Server if config.API.Enable { genDoc, err := genDocProvider() diff --git a/types/module/module_test.go b/types/module/module_test.go index 82ca0deb45fe..97d3a4b80726 100644 --- a/types/module/module_test.go +++ b/types/module/module_test.go @@ -239,7 +239,7 @@ func TestManager_ExportGenesis(t *testing.T) { want := map[string]json.RawMessage{ "module1": json.RawMessage(`{"key1": "value1"}`), "module2": json.RawMessage(`{"key2": "value2"}`)} - require.Equal(t, want, mm.ExportGenesis(ctx, cdc, []string{})) + require.Equal(t, want, mm.ExportGenesis(ctx, cdc)) } func TestManager_BeginBlock(t *testing.T) {