Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(v21.x) CacheKV speedups #505

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"context"
"fmt"
"strconv"

gogogrpc "github.com/cosmos/gogoproto/grpc"
Expand All @@ -21,7 +22,7 @@ import (
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server, logQueries bool) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
Expand Down Expand Up @@ -65,6 +66,10 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
app.logger.Error("failed to set gRPC header", "err", err)
}

if logQueries {
app.logger.Info("gRPC query received of type: " + fmt.Sprintf("%#v", req))
}

return handler(grpcCtx, req)
}

Expand Down
8 changes: 8 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

// DefaultLogQueries defines the default value for the log_queries parameter.
// Should be set to false unless debugging.
DefaultLogQueries = false

// FileStreamer defines the store streaming type for file streaming.
FileStreamer = "file"
)
Expand Down Expand Up @@ -177,6 +181,9 @@ type GRPCConfig struct {
// MaxSendMsgSize defines the max message size in bytes the server can send.
// The default value is math.MaxInt32.
MaxSendMsgSize int `mapstructure:"max-send-msg-size"`

// LogQueries logs every gRPC query to the console as an info log.
LogQueries bool `mapstructure:"log-queries"`
}

// GRPCWebConfig defines configuration for the gRPC-web server.
Expand Down Expand Up @@ -319,6 +326,7 @@ func DefaultConfig() *Config {
Address: DefaultGRPCAddress,
MaxRecvMsgSize: DefaultGRPCMaxRecvMsgSize,
MaxSendMsgSize: DefaultGRPCMaxSendMsgSize,
LogQueries: DefaultLogQueries,
},
Rosetta: RosettaConfig{
Enable: false,
Expand Down
5 changes: 5 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ max-recv-msg-size = "{{ .GRPC.MaxRecvMsgSize }}"
# The default value is math.MaxInt32.
max-send-msg-size = "{{ .GRPC.MaxSendMsgSize }}"

# LogQueries if enabled will print an info log containing the query request
# that was submitted to this node on every submission.
# This is useful strictly for debugging purposes and should be disabled otherwise.
log-queries = {{ .GRPC.LogQueries }}

###############################################################################
### gRPC Web Configuration ###
###############################################################################
Expand Down
2 changes: 1 addition & 1 deletion server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
grpc.MaxRecvMsgSize(maxRecvMsgSize),
)

app.RegisterGRPCServer(grpcSrv)
app.RegisterGRPCServer(grpcSrv, cfg.LogQueries)

// Reflection allows consumers to build dynamic clients that can write to any
// Cosmos SDK application without relying on application packages at compile
Expand Down
2 changes: 1 addition & 1 deletion server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type (

// RegisterGRPCServer registers gRPC services directly with the gRPC
// server.
RegisterGRPCServer(grpc.Server)
RegisterGRPCServer(grpc.Server, bool)

// RegisterTxService registers the gRPC Query service for tx (such as tx
// simulation, fetching txs by hash...).
Expand Down
7 changes: 3 additions & 4 deletions store/cachekv/search_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package cachekv
import (
"strconv"
"testing"

"github.com/cosmos/cosmos-sdk/store/cachekv/internal"
)

func BenchmarkLargeUnsortedMisses(b *testing.B) {
Expand Down Expand Up @@ -36,9 +34,10 @@ func generateStore() *Store {
cache[key] = &cValue{}
}

return &Store{
store := &Store{
cache: cache,
unsortedCache: unsorted,
sortedCache: internal.NewBTree(),
}
store.resetSortedCache()
return store
}
16 changes: 12 additions & 4 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Store struct {
mtx sync.Mutex
cache map[string]*cValue
unsortedCache map[string]struct{}
sortedCache internal.BTree // always ascending sorted
sortedCache *internal.BTree // always ascending sorted
parent types.KVStore
}

Expand All @@ -39,7 +39,7 @@ func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
sortedCache: internal.NewBTree(),
sortedCache: nil,
parent: parent,
}
}
Expand Down Expand Up @@ -93,13 +93,18 @@ func (store *Store) Delete(key []byte) {
store.setCacheValue(key, nil, true)
}

func (store *Store) resetSortedCache() {
newTree := internal.NewBTree()
store.sortedCache = &newTree
}

// Implements Cachetypes.KVStore.
func (store *Store) Write() {
store.mtx.Lock()
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
store.sortedCache = internal.NewBTree()
store.sortedCache = nil
return
}

Expand Down Expand Up @@ -140,7 +145,7 @@ func (store *Store) Write() {
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
store.sortedCache = internal.NewBTree()
store.sortedCache = nil
}

// CacheWrap implements CacheWrapper.
Expand Down Expand Up @@ -169,6 +174,9 @@ func (store *Store) ReverseIterator(start, end []byte) types.Iterator {
func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()
if store.sortedCache == nil {
store.resetSortedCache()
}

store.dirtyItems(start, end)
isoSortedCache := store.sortedCache.Copy()
Expand Down
2 changes: 1 addition & 1 deletion store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewStore(
}

func newCacheMultiStoreFromCMS(cms Store) Store {
stores := make(map[types.StoreKey]types.CacheWrapper)
stores := make(map[types.StoreKey]types.CacheWrapper, len(cms.stores))
for k, v := range cms.stores {
stores[k] = v
}
Expand Down
Loading