Skip to content

Commit

Permalink
initial grpc concurrency move (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
czarcas7ic authored Oct 10, 2022
1 parent 2e4ae2f commit d2f1cb4
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 57 deletions.
5 changes: 0 additions & 5 deletions client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ func ReadPersistentCommandFlags(clientCtx Context, flagSet *pflag.FlagSet) (Cont
clientCtx = clientCtx.WithSimulation(dryRun)
}

if !clientCtx.GRPCConcurrency || flagSet.Changed(flags.FlagGRPCConcurrency) {
grpcConcurrency, _ := flagSet.GetBool(flags.FlagGRPCConcurrency)
clientCtx = clientCtx.WithConcurrency(grpcConcurrency)
}

if clientCtx.KeyringDir == "" || flagSet.Changed(flags.FlagKeyringDir) {
keyringDir, _ := flagSet.GetString(flags.FlagKeyringDir)

Expand Down
6 changes: 0 additions & 6 deletions client/config/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"path/filepath"
"strconv"

tmcli "github.com/tendermint/tendermint/libs/cli"

Expand Down Expand Up @@ -59,8 +58,6 @@ func runConfigCmd(cmd *cobra.Command, args []string) error {
cmd.Println(conf.Node)
case flags.FlagBroadcastMode:
cmd.Println(conf.BroadcastMode)
case flags.FlagGRPCConcurrency:
cmd.Println(conf.GRPCConcurrency)
default:
err := errUnknownConfigKey(key)
return fmt.Errorf("couldn't get the value for the key: %v, error: %v", key, err)
Expand All @@ -81,9 +78,6 @@ func runConfigCmd(cmd *cobra.Command, args []string) error {
conf.SetNode(value)
case flags.FlagBroadcastMode:
conf.SetBroadcastMode(value)
case flags.FlagGRPCConcurrency:
valuebool, _ := strconv.ParseBool(value)
conf.SetGRPCConcurrency(valuebool)
default:
return errUnknownConfigKey(key)
}
Expand Down
28 changes: 11 additions & 17 deletions client/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,24 @@ import (

// Default constants
const (
chainID = ""
keyringBackend = "os"
output = "text"
node = "tcp://localhost:26657"
broadcastMode = "sync"
grpcConcurrency = false
chainID = ""
keyringBackend = "os"
output = "text"
node = "tcp://localhost:26657"
broadcastMode = "sync"
)

type ClientConfig struct {
ChainID string `mapstructure:"chain-id" json:"chain-id"`
KeyringBackend string `mapstructure:"keyring-backend" json:"keyring-backend"`
Output string `mapstructure:"output" json:"output"`
Node string `mapstructure:"node" json:"node"`
BroadcastMode string `mapstructure:"broadcast-mode" json:"broadcast-mode"`
GRPCConcurrency bool `mapstructure:"grpc-concurrency" json:"grpc-concurrency"`
ChainID string `mapstructure:"chain-id" json:"chain-id"`
KeyringBackend string `mapstructure:"keyring-backend" json:"keyring-backend"`
Output string `mapstructure:"output" json:"output"`
Node string `mapstructure:"node" json:"node"`
BroadcastMode string `mapstructure:"broadcast-mode" json:"broadcast-mode"`
}

// defaultClientConfig returns the reference to ClientConfig with default values.
func defaultClientConfig() *ClientConfig {
return &ClientConfig{chainID, keyringBackend, output, node, broadcastMode, grpcConcurrency}
return &ClientConfig{chainID, keyringBackend, output, node, broadcastMode}
}

func (c *ClientConfig) SetChainID(chainID string) {
Expand All @@ -52,10 +50,6 @@ func (c *ClientConfig) SetBroadcastMode(broadcastMode string) {
c.BroadcastMode = broadcastMode
}

func (c *ClientConfig) SetGRPCConcurrency(grpcConcurrency bool) {
c.GRPCConcurrency = grpcConcurrency
}

// ReadFromClientConfig reads values from client.toml file and updates them in client Context
func ReadFromClientConfig(ctx client.Context) (client.Context, error) {
configPath := filepath.Join(ctx.HomeDir, "config")
Expand Down
4 changes: 0 additions & 4 deletions client/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ output = "{{ .Output }}"
node = "{{ .Node }}"
# Transaction broadcasting mode (sync|async|block)
broadcast-mode = "{{ .BroadcastMode }}"
# Concurrency defines if node queries should be done in parallel.
# This is experimental and has led to node failures, so enable with caution.
# The default value is false.
grpc-concurrency = {{ .GRPCConcurrency }}
`

// writeConfigToFile parses defaultConfigTemplate, renders config using the template and writes it to
Expand Down
1 change: 0 additions & 1 deletion client/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ const (
FlagKeyAlgorithm = "algo"
FlagFeeAccount = "fee-account"
FlagReverse = "reverse"
FlagGRPCConcurrency = "grpc-concurrency"

// Tendermint logging flags
FlagLogLevel = "log_level"
Expand Down
33 changes: 12 additions & 21 deletions client/grpc_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type IntegrationTestSuite struct {
}

type testcase struct {
clientContextHeight int64
grpcHeight int64
expectedHeight int64
clientContextHeight int64
grpcHeight int64
expectedHeight int64
grpcConcurrentEnabled bool
}

const (
Expand Down Expand Up @@ -66,22 +67,6 @@ func (s *IntegrationTestSuite) TestGRPCQuery_TestService() {
s.Require().Equal("hello", testRes.Message)
}

func (s *IntegrationTestSuite) TestGRPCConcurrency() {
val0 := s.network.Validators[0]
clientCtx := val0.ClientCtx
clientCtx.GRPCConcurrency = true
in := &testdata.EchoRequest{Message: "hello"}
out := &testdata.EchoResponse{}
err := clientCtx.Invoke(context.Background(), "/testdata.Query/Echo", in, out)
s.Require().NoError(err)
s.Require().Equal("hello", out.Message)

clientCtx.GRPCConcurrency = false
err = clientCtx.Invoke(context.Background(), "/testdata.Query/Echo", in, out)
s.Require().NoError(err)
s.Require().Equal("hello", out.Message)
}

func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() {
val0 := s.network.Validators[0]

Expand All @@ -106,7 +91,13 @@ func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() {
"clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen": {
clientContextHeight: 1,
grpcHeight: 0, // chosen
expectedHeight: 3, // latest height
expectedHeight: 1, // context height
},
"clientContextHeight 3; grpcHeight is 0 - grpcHeight is chosen, grpcConcurrency on": {
clientContextHeight: 1,
grpcHeight: 0, // chosen
expectedHeight: 3, // latest height
grpcConcurrentEnabled: true,
},
"clientContextHeight 3; grpcHeight is 3 - 3 is returned": {
clientContextHeight: 3,
Expand All @@ -124,7 +115,7 @@ func (s *IntegrationTestSuite) TestGRPCQuery_BankService_VariousInputs() {
s.T().Run(name, func(t *testing.T) {
// Setup
clientCtx := val0.ClientCtx
clientCtx.GRPCConcurrency = true
clientCtx.GRPCConcurrency = tc.grpcConcurrentEnabled
clientCtx.Height = 0

if tc.clientContextHeight != heightNotSetFlag {
Expand Down
6 changes: 6 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ type GRPCConfig struct {
// MaxSendMsgSize defines the max message size in bytes the server can send.
// The default value is math.MaxInt32.
MaxSendMsgSize int `mapstructure:"max-send-msg-size"`

// Concurrency defines if node queries should be done in parallel.
// The default value is false
Concurrency bool `mapstructure:"concurrency"`
}

// GRPCWebConfig defines configuration for the gRPC-web server.
Expand Down Expand Up @@ -245,6 +249,7 @@ func DefaultConfig() *Config {
Address: DefaultGRPCAddress,
MaxRecvMsgSize: DefaultGRPCMaxRecvMsgSize,
MaxSendMsgSize: DefaultGRPCMaxSendMsgSize,
Concurrency: false,
},
Rosetta: RosettaConfig{
Enable: false,
Expand Down Expand Up @@ -321,6 +326,7 @@ func GetConfig(v *viper.Viper) Config {
Address: v.GetString("grpc.address"),
MaxRecvMsgSize: v.GetInt("grpc.max-recv-msg-size"),
MaxSendMsgSize: v.GetInt("grpc.max-send-msg-size"),
Concurrency: v.GetBool("grpc.concurrency"),
},
GRPCWeb: GRPCWebConfig{
Enable: v.GetBool("grpc-web.enable"),
Expand Down
5 changes: 5 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ max-recv-msg-size = "{{ .GRPC.MaxRecvMsgSize }}"
# The default value is math.MaxInt32.
max-send-msg-size = "{{ .GRPC.MaxSendMsgSize }}"
# Concurrency defines if node queries should be done in parallel.
# This is experimental and has led to node failures, so enable with caution.
# The default value is false.
concurrency = {{ .GRPC.Concurrency }}
###############################################################################
### gRPC Web Configuration ###
###############################################################################
Expand Down
7 changes: 5 additions & 2 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
pruningtypes "github.com/cosmos/cosmos-sdk/pruning/types"
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
serverconfig "github.com/cosmos/cosmos-sdk/server/config"
servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
"github.com/cosmos/cosmos-sdk/server/rosetta"
Expand Down Expand Up @@ -244,7 +243,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return err
}

config := config.GetConfig(ctx.Viper)
config := serverconfig.GetConfig(ctx.Viper)
if err := config.ValidateBasic(); err != nil {
ctx.Logger.Error("WARNING: The minimum-gas-prices config in app.toml is set to the empty string. " +
"This defaults to 0 in the current version, but will error in the next version " +
Expand Down Expand Up @@ -289,6 +288,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
app.RegisterTendermintService(clientCtx)
}

if config.GRPC.Concurrency {
clientCtx = clientCtx.WithConcurrency(true)
}

var apiSrv *api.Server
if config.API.Enable {
genDoc, err := genDocProvider()
Expand Down
2 changes: 1 addition & 1 deletion types/module/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestManager_ExportGenesis(t *testing.T) {
want := map[string]json.RawMessage{
"module1": json.RawMessage(`{"key1": "value1"}`),
"module2": json.RawMessage(`{"key2": "value2"}`)}
require.Equal(t, want, mm.ExportGenesis(ctx, cdc, []string{}))
require.Equal(t, want, mm.ExportGenesis(ctx, cdc))
}

func TestManager_BeginBlock(t *testing.T) {
Expand Down

0 comments on commit d2f1cb4

Please sign in to comment.