Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Implement read VM pool #546

Merged
merged 16 commits into from
Oct 6, 2021
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## [v0.5.6]

This release contains updates for multi-reader thread implementation with necessary dependency updates for multi-reader thread implementation.

* Bump cosmos-sdk to [v0.44.1](https://github.com/cosmos/cosmos-sdk/releases/tags/v0.44.1)
* Bump tendermint to [v0.44.1](https://github.com/tendermint/tendermint/releases/tags/v0.34.13)
* Bump CosmWasm to [v0.16.2](https://github.com/CosmWasm/cosmwasm/releases/tags/v0.16.2)

### Improvements
- [\#546](https://github.com/terra-money/core/pull/546) Implement read VM pool

## [v0.5.5]

### Improvements
Expand Down
25 changes: 12 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ go 1.16
module github.com/terra-money/core

require (
github.com/CosmWasm/wasmvm v0.16.0
github.com/cosmos/cosmos-sdk v0.44.0
github.com/CosmWasm/wasmvm v0.16.1
github.com/cosmos/cosmos-sdk v0.44.1
github.com/cosmos/ibc-go v1.1.0
github.com/gogo/protobuf v1.3.3
github.com/golang/protobuf v1.5.2
Expand All @@ -13,21 +13,20 @@ require (
github.com/pkg/errors v0.9.1
github.com/rakyll/statik v0.1.7
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.1.3
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tendermint/tendermint v0.34.12
github.com/tendermint/tendermint v0.34.13
github.com/tendermint/tm-db v0.6.4
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c
google.golang.org/grpc v1.38.0
google.golang.org/grpc v1.40.0
gopkg.in/yaml.v2 v2.4.0
)

replace github.com/cosmos/ledger-cosmos-go => github.com/terra-money/ledger-terra-go v0.11.2

replace google.golang.org/grpc => google.golang.org/grpc v1.33.2

replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

replace github.com/99designs/keyring => github.com/cosmos/keyring v1.1.7-0.20210622111912-ef00f8ac3d76
replace (
github.com/99designs/keyring => github.com/cosmos/keyring v1.1.7-0.20210622111912-ef00f8ac3d76
github.com/cosmos/ledger-cosmos-go => github.com/terra-money/ledger-terra-go v0.11.2
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
google.golang.org/grpc => google.golang.org/grpc v1.33.2
)
395 changes: 375 additions & 20 deletions go.sum

Large diffs are not rendered by default.

44 changes: 31 additions & 13 deletions x/wasm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (

// config default values
const (
DefaultContractQueryGasLimit = uint64(3000000)
DefaultContractDebugMode = false
DefaultContractMemoryCacheSize = uint32(500)
DefaultContractQueryGasLimit = uint64(3000000)
DefaultContractDebugMode = false
DefaultWriteVMMemoryCacheSize = uint32(500)
DefaultReadVMMemoryCacheSize = uint32(300)
DefaultNumReadVM = uint32(1)
)

// DBDir used to store wasm data to
Expand All @@ -26,25 +28,35 @@ type Config struct {
// The flag to specify whether print contract logs or not
ContractDebugMode bool `mapstructure:"contract-debug-mode"`

// The WASM VM memory cache size in MiB not bytes
ContractMemoryCacheSize uint32 `mapstructure:"contract-memory-cache-size"`
// The write WASM VM memory cache size in MiB not bytes
WriteVMMemoryCacheSize uint32 `mapstructure:"write-vm-memory-cache-size"`

// The read WASM VM memory cache size in MiB not bytes
ReadVMMemoryCacheSize uint32 `mapstructure:"read-vm-memory-cache-size"`

// The number of read WASM VMs
NumReadVMs uint32 `mapstructure:"num-read-vms"`
}

// DefaultConfig returns the default settings for WasmConfig
func DefaultConfig() *Config {
return &Config{
ContractQueryGasLimit: DefaultContractQueryGasLimit,
ContractDebugMode: DefaultContractDebugMode,
ContractMemoryCacheSize: DefaultContractMemoryCacheSize,
ContractQueryGasLimit: DefaultContractQueryGasLimit,
ContractDebugMode: DefaultContractDebugMode,
WriteVMMemoryCacheSize: DefaultWriteVMMemoryCacheSize,
ReadVMMemoryCacheSize: DefaultReadVMMemoryCacheSize,
NumReadVMs: DefaultNumReadVM,
}
}

// GetConfig load config values from the app options
func GetConfig(appOpts servertypes.AppOptions) *Config {
return &Config{
ContractQueryGasLimit: cast.ToUint64(appOpts.Get("wasm.contract-query-gas-limit")),
ContractDebugMode: cast.ToBool(appOpts.Get("wasm.contract-debug-mode")),
ContractMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.contract-memory-cache-size")),
ContractQueryGasLimit: cast.ToUint64(appOpts.Get("wasm.contract-query-gas-limit")),
ContractDebugMode: cast.ToBool(appOpts.Get("wasm.contract-debug-mode")),
WriteVMMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.write-vm-memory-cache-size")),
ReadVMMemoryCacheSize: cast.ToUint32(appOpts.Get("wasm.read-vm-memory-cache-size")),
NumReadVMs: cast.ToUint32(appOpts.Get("wasm.num-read-vms")),
}
}

Expand All @@ -59,6 +71,12 @@ contract-query-gas-limit = "{{ .WASMConfig.ContractQueryGasLimit }}"
# The flag to specify whether print contract logs or not
contract-debug-mode = "{{ .WASMConfig.ContractDebugMode }}"

# The WASM VM memory cache size in MiB not bytes
contract-memory-cache-size = "{{ .WASMConfig.ContractMemoryCacheSize }}"
# The write WASM VM memory cache size in MiB not bytes
write-vm-memory-cache-size = "{{ .WASMConfig.WriteVMMemoryCacheSize }}"

# The read WASM VM memory cache size in MiB not bytes
read-vm-memory-cache-size = "{{ .WASMConfig.ReadVMMemoryCacheSize }}"

# The number of read WASM VMs
num-read-vms = "{{ .WASMConfig.NumReadVMs }}"
`
15 changes: 11 additions & 4 deletions x/wasm/keeper/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (k Keeper) StoreCode(ctx sdk.Context, creator sdk.AccAddress, wasmCode []by

// MigrateCode uploads and compiles a WASM contract bytecode for the existing code id.
// After columbus-5 update, all contract code will be removed from the store
// due to in-compatibility between [email protected] and CosmWasm@v0.14.x
// The migration only can be executed by once after columbus-5 update.
// due to in-compatibility between [email protected] and CosmWasm@v0.16.x
// The migration can be executed by once after columbus-5 update.
// TODO - remove after columbus-5 update
func (k Keeper) MigrateCode(ctx sdk.Context, codeID uint64, creator sdk.AccAddress, wasmCode []byte) error {
codeInfo, err := k.GetCodeInfo(ctx, codeID)
Expand Down Expand Up @@ -420,7 +420,7 @@ func (k Keeper) queryToStore(ctx sdk.Context, contractAddress sdk.AccAddress, ke
return prefixStore.Get(key)
}

func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress, queryMsg []byte) ([]byte, error) {
func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress, queryMsg []byte, wasmVMs ...types.WasmerEngine) ([]byte, error) {
defer telemetry.MeasureSince(time.Now(), "wasm", "contract", "query-smart")
ctx.GasMeter().ConsumeGas(types.InstantiateContractCosts(len(queryMsg)), "Loading CosmWasm module: query")

Expand All @@ -430,7 +430,14 @@ func (k Keeper) queryToContract(ctx sdk.Context, contractAddress sdk.AccAddress,
}

env := types.NewEnv(ctx, contractAddress)
queryResult, gasUsed, err := k.wasmVM.Query(

// when the vm is given, use that given vm
wasmVM := k.wasmVM
if len(wasmVMs) != 0 {
wasmVM = wasmVMs[0]
}

queryResult, gasUsed, err := wasmVM.Query(
codeInfo.CodeHash,
env,
queryMsg,
Expand Down
78 changes: 60 additions & 18 deletions x/wasm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"encoding/binary"
"fmt"
"path/filepath"
"sync"

"github.com/tendermint/tendermint/libs/log"
"golang.org/x/sync/semaphore"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
Expand All @@ -32,7 +34,11 @@ type Keeper struct {
serviceRouter types.MsgServiceRouter
queryRouter types.GRPCQueryRouter

wasmVM types.WasmerEngine
wasmVM types.WasmerEngine
wasmReadVMPool []types.WasmerEngine
wasmReadVMSemaphore *semaphore.Weighted
wasmReadVMMutex *sync.Mutex

querier types.Querier
msgParser types.MsgParser

Expand All @@ -53,35 +59,71 @@ func NewKeeper(
supportedFeatures string,
homePath string,
wasmConfig *config.Config) Keeper {
wasmVM, err := wasmvm.NewVM(

// set KeyTable if it has not already been set
if !paramspace.HasKeyTable() {
paramspace = paramspace.WithKeyTable(types.ParamKeyTable())
}

writeWasmVM, err := wasmvm.NewVM(
filepath.Join(homePath, config.DBDir),
supportedFeatures,
types.ContractMemoryLimit,
wasmConfig.ContractDebugMode,
wasmConfig.ContractMemoryCacheSize,
wasmConfig.WriteVMMemoryCacheSize,
)

if err != nil {
panic(err)
}

// set KeyTable if it has not already been set
if !paramspace.HasKeyTable() {
paramspace = paramspace.WithKeyTable(types.ParamKeyTable())
// prevent zero read vm
if wasmConfig.NumReadVMs == 0 {
wasmConfig.NumReadVMs = config.DefaultNumReadVM
}

// prevent zero read vm cache
if wasmConfig.ReadVMMemoryCacheSize == 0 {
wasmConfig.ReadVMMemoryCacheSize = config.DefaultReadVMMemoryCacheSize
}

// prevent zero write vm cache
if wasmConfig.WriteVMMemoryCacheSize == 0 {
wasmConfig.WriteVMMemoryCacheSize = config.DefaultWriteVMMemoryCacheSize
}

numReadVms := wasmConfig.NumReadVMs
wasmReadVMPool := make([]types.WasmerEngine, numReadVms)
for i := uint32(0); i < numReadVms; i++ {
wasmReadVMPool[i], err = wasmvm.NewVM(
filepath.Join(homePath, config.DBDir),
supportedFeatures,
types.ContractMemoryLimit,
wasmConfig.ContractDebugMode,
wasmConfig.ReadVMMemoryCacheSize,
)

if err != nil {
panic(err)
}
}

return Keeper{
storeKey: storeKey,
cdc: cdc,
paramSpace: paramspace,
wasmVM: wasmVM,
accountKeeper: accountKeeper,
bankKeeper: bankKeeper,
treasuryKeeper: treasuryKeeper,
serviceRouter: serviceRouter,
queryRouter: queryRouter,
wasmConfig: wasmConfig,
msgParser: types.NewWasmMsgParser(),
querier: types.NewWasmQuerier(),
storeKey: storeKey,
cdc: cdc,
paramSpace: paramspace,
wasmVM: writeWasmVM,
wasmReadVMPool: wasmReadVMPool,
wasmReadVMSemaphore: semaphore.NewWeighted(int64(numReadVms)),
wasmReadVMMutex: &sync.Mutex{},
accountKeeper: accountKeeper,
bankKeeper: bankKeeper,
treasuryKeeper: treasuryKeeper,
serviceRouter: serviceRouter,
queryRouter: queryRouter,
wasmConfig: wasmConfig,
msgParser: types.NewWasmMsgParser(),
querier: types.NewWasmQuerier(),
}
}

Expand Down
9 changes: 8 additions & 1 deletion x/wasm/keeper/legacy_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ func queryContractStore(ctx sdk.Context, req abci.RequestQuery, k Keeper, legacy
return nil, sdkerrors.Wrap(sdkerrors.ErrJSONUnmarshal, err.Error())
}

wasmVM, err := k.acquireWasmVM(sdk.WrapSDKContext(ctx))
if err != nil {
return nil, sdkerrors.Wrap(types.ErrContractQueryFailed, err.Error())
}

// recover from out-of-gas panic
defer func() {
k.releaseWasmVM(wasmVM)

if r := recover(); r != nil {
switch rType := r.(type) {
case sdk.ErrorOutOfGas:
Expand All @@ -140,7 +147,7 @@ func queryContractStore(ctx sdk.Context, req abci.RequestQuery, k Keeper, legacy
}
}()

bz, err = k.queryToContract(ctx, params.ContractAddress, params.Msg)
bz, err = k.queryToContract(ctx, params.ContractAddress, params.Msg, wasmVM)

return
}
Expand Down
55 changes: 55 additions & 0 deletions x/wasm/keeper/legacy_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"
"testing"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -94,3 +95,57 @@ func TestLegacyParams(t *testing.T) {
require.NoError(t, err)
require.Equal(t, input.WasmKeeper.GetParams(input.Ctx), params)
}

func TestLegacyMultipleGoroutines(t *testing.T) {
input := CreateTestInput(t)
ctx, accKeeper, bankKeeper, keeper := input.Ctx, input.AccKeeper, input.BankKeeper, input.WasmKeeper

deposit := sdk.NewCoins(sdk.NewInt64Coin("denom", 100000))
topUp := sdk.NewCoins(sdk.NewInt64Coin("denom", 5000))
creator := createFakeFundedAccount(ctx, accKeeper, bankKeeper, deposit.Add(deposit...))
anyAddr := createFakeFundedAccount(ctx, accKeeper, bankKeeper, topUp)

wasmCode, err := ioutil.ReadFile("./testdata/hackatom.wasm")
require.NoError(t, err)

contractID, err := keeper.StoreCode(ctx, creator, wasmCode)
require.NoError(t, err)

_, _, bob := keyPubAddr()
initMsg := HackatomExampleInitMsg{
Verifier: anyAddr,
Beneficiary: bob,
}
initMsgBz, err := json.Marshal(initMsg)
require.NoError(t, err)

addr, _, err := keeper.InstantiateContract(ctx, contractID, creator, sdk.AccAddress{}, initMsgBz, deposit)
require.NoError(t, err)

contractModel := []types.Model{
{Key: []byte("foo"), Value: []byte(`"bar"`)},
{Key: []byte{0x0, 0x1}, Value: []byte(`{"count":8}`)},
}

keeper.SetContractStore(ctx, addr, contractModel)

querier := NewLegacyQuerier(keeper, input.Cdc)

wg := &sync.WaitGroup{}
testCases := 100
wg.Add(testCases)
for n := 0; n < testCases; n++ {
go func() {
// query contract []byte(`{"verifier":{}}`)
bz, err := input.Cdc.MarshalJSON(types.NewQueryContractParams(addr, []byte(`{"verifier":{}}`)))
require.NoError(t, err)

res, err := querier(ctx, []string{types.QueryContractStore}, abci.RequestQuery{Data: []byte(bz)})
require.NoError(t, err)
require.Equal(t, fmt.Sprintf(`{"verifier":"%s"}`, anyAddr.String()), string(res))

wg.Done()
}()
}
wg.Wait()
}
Loading