Skip to content

Commit

Permalink
[Feature] Implement read VM pool (#546)
Browse files Browse the repository at this point in the history
* test purpose readvm pool implementation, still requiring tendermint RWMutex

* fix lint

* remove data race

* remove replace

* wrap pool logic with mutex

* fix comment

* enforce zero readvm config to use default config

* bump cosmos-sdk to v0.44.1

* bump cosmwasm to v0.16.1

* cleanup go.mod

* changelog update

* prevent zero write vm cache

* rename the functions

* fix typo
  • Loading branch information
yys authored Oct 6, 2021
1 parent f3a1b82 commit 6e903ef
Show file tree
Hide file tree
Showing 11 changed files with 654 additions and 70 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## [v0.5.6]

This release contains updates for multi-reader thread implementation with necessary dependency updates for multi-reader thread implementation.

* Bump cosmos-sdk to [v0.44.1](https://github.com/cosmos/cosmos-sdk/releases/tags/v0.44.1)
* Bump tendermint to [v0.44.1](https://github.com/tendermint/tendermint/releases/tags/v0.34.13)
* Bump CosmWasm to [v0.16.2](https://github.com/CosmWasm/cosmwasm/releases/tags/v0.16.2)

### Improvements
- [\#546](https://github.com/terra-money/core/pull/546) Implement read VM pool

## [v0.5.5]

### Improvements
Expand Down
25 changes: 12 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ go 1.16
module github.com/terra-money/core

require (
github.com/CosmWasm/wasmvm v0.16.0
github.com/cosmos/cosmos-sdk v0.44.0
github.com/CosmWasm/wasmvm v0.16.1
github.com/cosmos/cosmos-sdk v0.44.1
github.com/cosmos/ibc-go v1.1.0
github.com/gogo/protobuf v1.3.3
github.com/golang/protobuf v1.5.2
Expand All @@ -13,21 +13,20 @@ require (
github.com/pkg/errors v0.9.1
github.com/rakyll/statik v0.1.7
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.1.3
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tendermint/tendermint v0.34.12
github.com/tendermint/tendermint v0.34.13
github.com/tendermint/tm-db v0.6.4
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0
google.golang.org/grpc v1.40.0
gopkg.in/yaml.v2 v2.4.0
)

replace github.com/cosmos/ledger-cosmos-go => github.com/terra-money/ledger-terra-go v0.11.2

replace google.golang.org/grpc => google.golang.org/grpc v1.33.2

replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

replace github.com/99designs/keyring => github.com/cosmos/keyring v1.1.7-0.20210622111912-ef00f8ac3d76
replace (
github.com/99designs/keyring => github.com/cosmos/keyring v1.1.7-0.20210622111912-ef00f8ac3d76
github.com/cosmos/ledger-cosmos-go => github.com/terra-money/ledger-terra-go v0.11.2
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
google.golang.org/grpc => google.golang.org/grpc v1.33.2
)
395 changes: 375 additions & 20 deletions go.sum

Large diffs are not rendered by default.

44 changes: 31 additions & 13 deletions x/wasm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (

// config default values
const (
DefaultContractQueryGasLimit = uint64(3000000)
DefaultContractDebugMode = false
DefaultContractMemoryCacheSize = uint32(500)
DefaultContractQueryGasLimit = uint64(3000000)
DefaultContractDebugMode = false
DefaultWriteVMMemoryCacheSize = uint32(500)
DefaultReadVMMemoryCacheSize = uint32(300)
DefaultNumReadVM = uint32(1)
)

// DBDir used to store wasm data to
Expand All @@ -26,25 +28,35 @@ type Config struct {
// The flag to specify whether print contract logs or not
ContractDebugMode bool `mapstructure:"contract-debug-mode"`

// The WASM VM memory cache size in MiB not bytes
ContractMemoryCacheSize uint32 `mapstructure:"contract-memory-cache-size"`
// The write WASM VM memory cache size in MiB not bytes
WriteVMMemoryCacheSize uint32 `mapstructure:"write-vm-memory-cache-size"`

// The read WASM VM memory cache size in MiB not bytes
ReadVMMemoryCacheSize uint32 `mapstructure:"read-vm-memory-cache-size"`

// The number of read WASM VMs
NumReadVMs uint32 `mapstructure:"num-read-vms"`
}

// DefaultConfig returns the default settings for WasmConfig
func DefaultConfig() *Config {
return &Config{
ContractQueryGasLimit: DefaultContractQueryGasLimit,
ContractDebugMode: DefaultContractDebugMode,
ContractMemoryCacheSize: DefaultContractMemoryCacheSize,
ContractQueryGasLimit: DefaultContractQueryGasLimit,
ContractDebugMode: DefaultContractDebugMode,
WriteVMMemoryCacheSize: DefaultWriteVMMemoryCacheSize,
ReadVMMemoryCacheSize: DefaultReadVMMemoryCacheSize,
NumReadVMs: DefaultNumReadVM,
}
}

// GetConfig load config values from the app options
func GetConfig(appOpts servertypes.AppOptions) *Config {
return &Config{
ContractQueryGasLimit: cast.ToUint64(appOpts.Get("wasm.contract-query-gas-limit")),
ContractDebugMode: cast.ToBool(appOpts.Get("wasm.contract-debug-mode")),
ContractMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.contract-memory-cache-size")),
ContractQueryGasLimit: cast.ToUint64(appOpts.Get("wasm.contract-query-gas-limit")),
ContractDebugMode: cast.ToBool(appOpts.Get("wasm.contract-debug-mode")),
WriteVMMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.write-vm-memory-cache-size")),
ReadVMMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.read-vm-memory-cache-size")),
NumReadVMs: cast.ToUint32(appOpts.Get("wasm.num-read-vms")),
}
}

Expand All @@ -59,6 +71,12 @@ contract-query-gas-limit = "{{ .WASMConfig.ContractQueryGasLimit }}"
# The flag to specify whether print contract logs or not
contract-debug-mode = "{{ .WASMConfig.ContractDebugMode }}"
# The WASM VM memory cache size in MiB not bytes
contract-memory-cache-size = "{{ .WASMConfig.ContractMemoryCacheSize }}"
# The write WASM VM memory cache size in MiB not bytes
write-vm-memory-cache-size = "{{ .WASMConfig.WriteVMMemoryCacheSize }}"
# The read WASM VM memory cache size in MiB not bytes
read-vm-memory-cache-size = "{{ .WASMConfig.ReadVMMemoryCacheSize }}"
# The number of read WASM VMs
num-read-vms = "{{ .WASMConfig.NumReadVMs }}"
`
15 changes: 11 additions & 4 deletions x/wasm/keeper/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (k Keeper) StoreCode(ctx sdk.Context, creator sdk.AccAddress, wasmCode []by

// MigrateCode uploads and compiles a WASM contract bytecode for the existing code id.
// After columbus-5 update, all contract code will be removed from the store
// due to in-compatibility between [email protected] and CosmWasm@v0.14.x
// The migration only can be executed by once after columbus-5 update.
// due to in-compatibility between [email protected] and CosmWasm@v0.16.x
// The migration can be executed by once after columbus-5 update.
// TODO - remove after columbus-5 update
func (k Keeper) MigrateCode(ctx sdk.Context, codeID uint64, creator sdk.AccAddress, wasmCode []byte) error {
codeInfo, err := k.GetCodeInfo(ctx, codeID)
Expand Down Expand Up @@ -420,7 +420,7 @@ func (k Keeper) queryToStore(ctx sdk.Context, contractAddress sdk.AccAddress, ke
return prefixStore.Get(key)
}

func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress, queryMsg []byte) ([]byte, error) {
func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress, queryMsg []byte, wasmVMs ...types.WasmerEngine) ([]byte, error) {
defer telemetry.MeasureSince(time.Now(), "wasm", "contract", "query-smart")
ctx.GasMeter().ConsumeGas(types.InstantiateContractCosts(len(queryMsg)), "Loading CosmWasm module: query")

Expand All @@ -430,7 +430,14 @@ func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress,
}

env := types.NewEnv(ctx, contractAddress)
queryResult, gasUsed, err := k.wasmVM.Query(

// when the vm is given, use that given vm
wasmVM := k.wasmVM
if len(wasmVMs) != 0 {
wasmVM = wasmVMs[0]
}

queryResult, gasUsed, err := wasmVM.Query(
codeInfo.CodeHash,
env,
queryMsg,
Expand Down
78 changes: 60 additions & 18 deletions x/wasm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"encoding/binary"
"fmt"
"path/filepath"
"sync"

"github.com/tendermint/tendermint/libs/log"
"golang.org/x/sync/semaphore"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
Expand All @@ -32,7 +34,11 @@ type Keeper struct {
serviceRouter types.MsgServiceRouter
queryRouter types.GRPCQueryRouter

wasmVM types.WasmerEngine
wasmVM types.WasmerEngine
wasmReadVMPool []types.WasmerEngine
wasmReadVMSemaphore *semaphore.Weighted
wasmReadVMMutex *sync.Mutex

querier types.Querier
msgParser types.MsgParser

Expand All @@ -53,35 +59,71 @@ func NewKeeper(
supportedFeatures string,
homePath string,
wasmConfig *config.Config) Keeper {
wasmVM, err := wasmvm.NewVM(

// set KeyTable if it has not already been set
if !paramspace.HasKeyTable() {
paramspace = paramspace.WithKeyTable(types.ParamKeyTable())
}

writeWasmVM, err := wasmvm.NewVM(
filepath.Join(homePath, config.DBDir),
supportedFeatures,
types.ContractMemoryLimit,
wasmConfig.ContractDebugMode,
wasmConfig.ContractMemoryCacheSize,
wasmConfig.WriteVMMemoryCacheSize,
)

if err != nil {
panic(err)
}

// set KeyTable if it has not already been set
if !paramspace.HasKeyTable() {
paramspace = paramspace.WithKeyTable(types.ParamKeyTable())
// prevent zero read vm
if wasmConfig.NumReadVMs == 0 {
wasmConfig.NumReadVMs = config.DefaultNumReadVM
}

// prevent zero read vm cache
if wasmConfig.ReadVMMemoryCacheSize == 0 {
wasmConfig.ReadVMMemoryCacheSize = config.DefaultReadVMMemoryCacheSize
}

// prevent zero write vm cache
if wasmConfig.WriteVMMemoryCacheSize == 0 {
wasmConfig.WriteVMMemoryCacheSize = config.DefaultWriteVMMemoryCacheSize
}

numReadVms := wasmConfig.NumReadVMs
wasmReadVMPool := make([]types.WasmerEngine, numReadVms)
for i := uint32(0); i < numReadVms; i++ {
wasmReadVMPool[i], err = wasmvm.NewVM(
filepath.Join(homePath, config.DBDir),
supportedFeatures,
types.ContractMemoryLimit,
wasmConfig.ContractDebugMode,
wasmConfig.ReadVMMemoryCacheSize,
)

if err != nil {
panic(err)
}
}

return Keeper{
storeKey: storeKey,
cdc: cdc,
paramSpace: paramspace,
wasmVM: wasmVM,
accountKeeper: accountKeeper,
bankKeeper: bankKeeper,
treasuryKeeper: treasuryKeeper,
serviceRouter: serviceRouter,
queryRouter: queryRouter,
wasmConfig: wasmConfig,
msgParser: types.NewWasmMsgParser(),
querier: types.NewWasmQuerier(),
storeKey: storeKey,
cdc: cdc,
paramSpace: paramspace,
wasmVM: writeWasmVM,
wasmReadVMPool: wasmReadVMPool,
wasmReadVMSemaphore: semaphore.NewWeighted(int64(numReadVms)),
wasmReadVMMutex: &sync.Mutex{},
accountKeeper: accountKeeper,
bankKeeper: bankKeeper,
treasuryKeeper: treasuryKeeper,
serviceRouter: serviceRouter,
queryRouter: queryRouter,
wasmConfig: wasmConfig,
msgParser: types.NewWasmMsgParser(),
querier: types.NewWasmQuerier(),
}
}

Expand Down
9 changes: 8 additions & 1 deletion x/wasm/keeper/legacy_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ func queryContractStore(ctx sdk.Context, req abci.RequestQuery, k Keeper, legacy
return nil, sdkerrors.Wrap(sdkerrors.ErrJSONUnmarshal, err.Error())
}

wasmVM, err := k.acquireWasmVM(sdk.WrapSDKContext(ctx))
if err != nil {
return nil, sdkerrors.Wrap(types.ErrContractQueryFailed, err.Error())
}

// recover from out-of-gas panic
defer func() {
k.releaseWasmVM(wasmVM)

if r := recover(); r != nil {
switch rType := r.(type) {
case sdk.ErrorOutOfGas:
Expand All @@ -140,7 +147,7 @@ func queryContractStore(ctx sdk.Context, req abci.RequestQuery, k Keeper, legacy
}
}()

bz, err = k.queryToContract(ctx, params.ContractAddress, params.Msg)
bz, err = k.queryToContract(ctx, params.ContractAddress, params.Msg, wasmVM)

return
}
Expand Down
55 changes: 55 additions & 0 deletions x/wasm/keeper/legacy_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -94,3 +95,57 @@ func TestLegacyParams(t *testing.T) {
require.NoError(t, err)
require.Equal(t, input.WasmKeeper.GetParams(input.Ctx), params)
}

func TestLegacyMultipleGoroutines(t *testing.T) {
input := CreateTestInput(t)
ctx, accKeeper, bankKeeper, keeper := input.Ctx, input.AccKeeper, input.BankKeeper, input.WasmKeeper

deposit := sdk.NewCoins(sdk.NewInt64Coin("denom", 100000))
topUp := sdk.NewCoins(sdk.NewInt64Coin("denom", 5000))
creator := createFakeFundedAccount(ctx, accKeeper, bankKeeper, deposit.Add(deposit...))
anyAddr := createFakeFundedAccount(ctx, accKeeper, bankKeeper, topUp)

wasmCode, err := ioutil.ReadFile("./testdata/hackatom.wasm")
require.NoError(t, err)

contractID, err := keeper.StoreCode(ctx, creator, wasmCode)
require.NoError(t, err)

_, _, bob := keyPubAddr()
initMsg := HackatomExampleInitMsg{
Verifier: anyAddr,
Beneficiary: bob,
}
initMsgBz, err := json.Marshal(initMsg)
require.NoError(t, err)

addr, _, err := keeper.InstantiateContract(ctx, contractID, creator, sdk.AccAddress{}, initMsgBz, deposit)
require.NoError(t, err)

contractModel := []types.Model{
{Key: []byte("foo"), Value: []byte(`"bar"`)},
{Key: []byte{0x0, 0x1}, Value: []byte(`{"count":8}`)},
}

keeper.SetContractStore(ctx, addr, contractModel)

querier := NewLegacyQuerier(keeper, input.Cdc)

wg := &sync.WaitGroup{}
testCases := 100
wg.Add(testCases)
for n := 0; n < testCases; n++ {
go func() {
// query contract []byte(`{"verifier":{}}`)
bz, err := input.Cdc.MarshalJSON(types.NewQueryContractParams(addr, []byte(`{"verifier":{}}`)))
require.NoError(t, err)

res, err := querier(ctx, []string{types.QueryContractStore}, abci.RequestQuery{Data: []byte(bz)})
require.NoError(t, err)
require.Equal(t, fmt.Sprintf(`{"verifier":"%s"}`, anyAddr.String()), string(res))

wg.Done()
}()
}
wg.Wait()
}
Loading

0 comments on commit 6e903ef

Please sign in to comment.