diff --git a/README.md b/README.md index ac7241c3b..78e4feb7e 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,18 @@ -# Tendermint +# Linemint -![banner](docs/tendermint-core-image.jpg) +![banner](docs/linemint-image.jpg) [Byzantine-Fault Tolerant](https://en.wikipedia.org/wiki/Byzantine_fault_tolerance) [State Machines](https://en.wikipedia.org/wiki/State_machine_replication). Or [Blockchain](), for short. -[![version](https://img.shields.io/github/tag/tendermint/tendermint.svg)](https://github.com/tendermint/tendermint/releases/latest) -[![API Reference](https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/68747470733a2f2f676f646f632e6f72672f6769746875622e636f6d2f676f6c616e672f6764646f3f7374617475732e737667)](https://pkg.go.dev/github.com/tendermint/tendermint) [![Go version](https://img.shields.io/badge/go-1.15-blue.svg)](https://github.com/moovweb/gvm) -[![Discord chat](https://img.shields.io/discord/669268347736686612.svg)](https://discord.gg/AzefAFd) -[![license](https://img.shields.io/github/license/tendermint/tendermint.svg)](https://github.com/tendermint/tendermint/blob/master/LICENSE) -[![tendermint/tendermint](https://tokei.rs/b1/github/tendermint/tendermint?category=lines)](https://github.com/tendermint/tendermint) -[![Sourcegraph](https://sourcegraph.com/github.com/tendermint/tendermint/-/badge.svg)](https://sourcegraph.com/github.com/tendermint/tendermint?badge) | Branch | Tests | Coverage | Linting | | ------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------------- | -| master | [![CircleCI](https://circleci.com/gh/tendermint/tendermint/tree/master.svg?style=shield)](https://circleci.com/gh/tendermint/tendermint/tree/master)
![Tests](https://github.com/tendermint/tendermint/workflows/Tests/badge.svg?branch=master) | [![codecov](https://codecov.io/gh/tendermint/tendermint/branch/master/graph/badge.svg)](https://codecov.io/gh/tendermint/tendermint) | ![Lint](https://github.com/tendermint/tendermint/workflows/Lint/badge.svg) | +| ebony | [![CircleCI](https://circleci.com/gh/line/ostracon/tree/ebony.svg?style=shield)](https://circleci.com/gh/line/ostracon/tree/ebony)
![Tests](https://github.com/tendermint/tendermint/workflows/Tests/badge.svg?branch=master) | [![codecov](https://codecov.io/gh/tendermint/tendermint/branch/master/graph/badge.svg)](https://codecov.io/gh/tendermint/tendermint) | ![Lint](https://github.com/line/ostracon/workflows/Lint/badge.svg) | +Linemint Core is derived from Tendermint Core. Tendermint Core is Byzantine Fault Tolerant (BFT) middleware that takes a state transition machine - written in any programming language - and securely replicates it on many machines. @@ -28,7 +23,7 @@ see our recent paper, "[The latest gossip on BFT consensus](https://arxiv.org/ab ## Releases -Please do not depend on master as your production branch. Use [releases](https://github.com/tendermint/tendermint/releases) instead. +Please do not depend on master as your production branch. Use [releases](https://github.com/line/ostracon/releases) instead. Tendermint is being used in production in both private and public environments, most notably the blockchains of the [Cosmos Network](https://cosmos.network/). @@ -149,6 +144,9 @@ Additional tooling can be found in [/docs/tools](/docs/tools). ### Applications +- [LBM SDK](http://github.com/line/lbm-sdk); a line blockchain mainnet framework +- [LBM](http://github.com/line/lbm); line blockchain mainnet +- [OSTRACON](http://github.com/line/ostracon); tendermint-based BFT algorithm using VRF random sampling - [Cosmos SDK](http://github.com/cosmos/cosmos-sdk); a cryptocurrency application framework - [Ethermint](http://github.com/cosmos/ethermint); Ethereum on Tendermint - [Many more](https://tendermint.com/ecosystem) diff --git a/config/config.go b/config/config.go index 15b3ab092..325deface 100644 --- a/config/config.go +++ b/config/config.go @@ -332,6 +332,30 @@ type RPCConfig struct { // 1024 - 40 - 10 - 50 = 924 = ~900 MaxOpenConnections int `mapstructure:"max_open_connections"` + // mirrors http.Server#ReadTimeout + // ReadTimeout is the maximum duration for reading the entire + // request, including the body. + // + // Because ReadTimeout does not let Handlers make per-request + // decisions on each request body's acceptable deadline or + // upload rate, most users will prefer to use + // ReadHeaderTimeout. It is valid to use them both. + ReadTimeout time.Duration `mapstructure:"read_timeout"` + + // mirrors http.Server#WriteTimeout + // WriteTimeout is the maximum duration before timing out + // writes of the response. It is reset whenever a new + // request's header is read. Like ReadTimeout, it does not + // let Handlers make decisions on a per-request basis. + WriteTimeout time.Duration `mapstructure:"write_timeout"` + + // mirrors http.Server#IdleTimeout + // IdleTimeout is the maximum amount of time to wait for the + // next request when keep-alives are enabled. If IdleTimeout + // is zero, the value of ReadTimeout is used. If both are + // zero, there is no timeout. + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + // Maximum number of unique clientIDs that can /subscribe // If you're using /broadcast_tx_commit, set to the estimated maximum number // of broadcast_tx_commit calls per block. @@ -343,7 +367,7 @@ type RPCConfig struct { MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"` // How long to wait for a tx to be committed during /broadcast_tx_commit - // WARNING: Using a value larger than 10s will result in increasing the + // WARNING: Using a value larger than 'WriteTimeout' will result in increasing the // global HTTP write timeout, which applies to all connections and endpoints. // See https://github.com/tendermint/tendermint/issues/3435 TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"` @@ -388,6 +412,9 @@ func DefaultRPCConfig() *RPCConfig { Unsafe: false, MaxOpenConnections: 900, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 60 * time.Second, MaxSubscriptionClients: 100, MaxSubscriptionsPerClient: 5, @@ -419,6 +446,15 @@ func (cfg *RPCConfig) ValidateBasic() error { if cfg.MaxOpenConnections < 0 { return errors.New("max_open_connections can't be negative") } + if cfg.ReadTimeout < 0 { + return errors.New("read_timeout can't be negative") + } + if cfg.WriteTimeout < 0 { + return errors.New("write_timeout can't be negative") + } + if cfg.IdleTimeout < 0 { + return errors.New("idle_timeout can't be negative") + } if cfg.MaxSubscriptionClients < 0 { return errors.New("max_subscription_clients can't be negative") } diff --git a/config/toml.go b/config/toml.go index 82ecdda20..50ac822e9 100644 --- a/config/toml.go +++ b/config/toml.go @@ -196,6 +196,29 @@ unsafe = {{ .RPC.Unsafe }} # 1024 - 40 - 10 - 50 = 924 = ~900 max_open_connections = {{ .RPC.MaxOpenConnections }} +# mirrors http.Server#ReadTimeout +# ReadTimeout is the maximum duration for reading the entire +# request, including the body. +# Because ReadTimeout does not let Handlers make per-request +# decisions on each request body's acceptable deadline or +# upload rate, most users will prefer to use +# ReadHeaderTimeout. It is valid to use them both. +read_timeout = "{{ .RPC.ReadTimeout }}" + +# mirrors http.Server#WriteTimeout +# WriteTimeout is the maximum duration before timing out +# writes of the response. It is reset whenever a new +# request's header is read. Like ReadTimeout, it does not +# let Handlers make decisions on a per-request basis. +write_timeout = "{{ .RPC.WriteTimeout }}" + +# mirrors http.Server#IdleTimeout +# IdleTimeout is the maximum amount of time to wait for the +# next request when keep-alives are enabled. If IdleTimeout +# is zero, the value of ReadTimeout is used. If both are +# zero, there is no timeout. +idle_timeout = "{{ .RPC.IdleTimeout }}" + # Maximum number of unique clientIDs that can /subscribe # If you're using /broadcast_tx_commit, set to the estimated maximum number # of broadcast_tx_commit calls per block. @@ -207,7 +230,7 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }} max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }} # How long to wait for a tx to be committed during /broadcast_tx_commit. -# WARNING: Using a value larger than 10s will result in increasing the +# WARNING: Using a value larger than 'WriteTimeout' will result in increasing the # global HTTP write timeout, which applies to all connections and endpoints. # See https://github.com/tendermint/tendermint/issues/3435 timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}" diff --git a/consensus/replay_stubs.go b/consensus/replay_stubs.go index 08974a67e..087727fed 100644 --- a/consensus/replay_stubs.go +++ b/consensus/replay_stubs.go @@ -28,7 +28,6 @@ func (emptyMempool) Update( _ types.Txs, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/docs/linemint-image.jpg b/docs/linemint-image.jpg new file mode 100644 index 000000000..208a80b9a Binary files /dev/null and b/docs/linemint-image.jpg differ diff --git a/mempool/cache_test.go b/mempool/cache_test.go index d9a53f475..01cdcb79d 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -68,7 +68,7 @@ func TestCacheAfterUpdate(t *testing.T) { tx := types.Tx{byte(v)} updateTxs = append(updateTxs, tx) } - err := mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) + err := mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil) require.NoError(t, err) for _, v := range tc.reAddIndices { diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 3b3709588..4ad87c91d 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -7,6 +7,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" @@ -38,6 +39,10 @@ type CListMempool struct { height int64 // the last block Update()'d to txsBytes int64 // total size of mempool, in bytes + reserved int // the number of checking tx and it should be considered when checking mempool full + reservedBytes int64 // size of checking tx and it should be considered when checking mempool full + reservedMtx sync.Mutex + // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool txsAvailable chan struct{} // fires once for each height, when the mempool is not empty @@ -48,7 +53,6 @@ type CListMempool struct { // CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods. updateMtx tmsync.RWMutex preCheck PreCheckFunc - postCheck PostCheckFunc wal *auto.AutoFile // a log of mempool txs txs *clist.CList // concurrent linked-list of good txs @@ -124,13 +128,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption { return func(mem *CListMempool) { mem.preCheck = f } } -// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran after CheckTx. Only applies to the first created block. -// After that, Update overwrites the existing value. -func WithPostCheck(f PostCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.postCheck = f } -} - // WithMetrics sets the metrics. func WithMetrics(metrics *Metrics) CListMempoolOption { return func(mem *CListMempool) { mem.metrics = metrics } @@ -284,7 +281,16 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return ErrTxInCache } + // END CACHE + // reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()` + if err := mem.reserve(int64(txSize)); err != nil { + // remove from cache + mem.cache.Remove(tx) + return err + } + + // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) @@ -305,7 +311,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { return } - mem.metrics.RecheckTimes.Add(1) + mem.metrics.RecheckCount.Add(1) mem.resCbRecheck(req, res) // update metrics @@ -394,6 +400,35 @@ func (mem *CListMempool) isFull(txSize int) error { return nil } +func (mem *CListMempool) reserve(txSize int64) error { + mem.reservedMtx.Lock() + defer mem.reservedMtx.Unlock() + + var ( + memSize = mem.Size() + txsBytes = mem.TxsBytes() + ) + + if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes { + return ErrMempoolIsFull{ + memSize + mem.reserved, mem.config.Size, + txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes, + } + } + + mem.reserved++ + mem.reservedBytes += txSize + return nil +} + +func (mem *CListMempool) releaseReserve(txSize int64) { + mem.reservedMtx.Lock() + defer mem.reservedMtx.Unlock() + + mem.reserved-- + mem.reservedBytes -= txSize +} + // callback, which is called after the app checked the tx for the first time. // // The case where the app checks the tx for the second and subsequent times is @@ -406,20 +441,7 @@ func (mem *CListMempool) resCbFirstTime( ) { switch r := res.Value.(type) { case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Check mempool isn't full again to reduce the chance of exceeding the - // limits. - if err := mem.isFull(len(tx)); err != nil { - // remove from cache (mempool might have a space later) - mem.cache.Remove(tx) - mem.logger.Error(err.Error()) - return - } - + if r.CheckTx.Code == abci.CodeTypeOK { memTx := &mempoolTx{ height: mem.height, gasWanted: r.CheckTx.GasWanted, @@ -437,13 +459,16 @@ func (mem *CListMempool) resCbFirstTime( } else { // ignore bad transaction mem.logger.Debug("rejected bad transaction", - "tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr) + "tx", txID(tx), "peerID", peerP2PID, "res", r, "err") mem.metrics.FailedTxs.Add(1) if !mem.config.KeepInvalidTxsInCache { // remove from cache (it might be good later) mem.cache.Remove(tx) } } + + // release `reserve` regardless it's OK or not (it might be good later) + mem.releaseReserve(int64(len(tx))) default: // ignore other messages } @@ -464,15 +489,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { memTx.tx, tx)) } - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { + if r.CheckTx.Code == abci.CodeTypeOK { // Good, nothing to do. } else { // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr) + mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err") // NOTE: we remove tx from the cache because it might be good later mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) } @@ -565,13 +586,12 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { return txs } -// Lock() must be help by the caller during execution. +// Lock() must be held by the caller during execution. func (mem *CListMempool) Update( height int64, txs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, - postCheck PostCheckFunc, ) error { // Set height mem.height = height @@ -580,9 +600,6 @@ func (mem *CListMempool) Update( if preCheck != nil { mem.preCheck = preCheck } - if postCheck != nil { - mem.postCheck = postCheck - } for i, tx := range txs { if deliverTxResponses[i].Code == abci.CodeTypeOK { @@ -610,6 +627,7 @@ func (mem *CListMempool) Update( // Either recheck non-committed txs to see if they became invalid // or just notify there're some txs left. + recheckStartTime := time.Now().UnixNano() if mem.Size() > 0 { if mem.config.Recheck { mem.logger.Debug("recheck txs", "numtxs", mem.Size(), "height", height) @@ -621,6 +639,10 @@ func (mem *CListMempool) Update( mem.notifyTxsAvailable() } } + recheckEndTime := time.Now().UnixNano() + + recheckTimeMs := float64(recheckEndTime-recheckStartTime) / 1000000 + mem.metrics.RecheckTime.Set(recheckTimeMs) // Update metrics mem.metrics.Size.Set(float64(mem.Size())) diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index a40ba69af..bbcf5be52 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -147,30 +147,22 @@ func TestMempoolFilters(t *testing.T) { emptyTxArr := []types.Tx{[]byte{}} nopPreFilter := func(tx types.Tx) error { return nil } - nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. // each tx has 20 bytes tests := []struct { numTxsToCreate int preFilter PreCheckFunc - postFilter PostCheckFunc expectedNumTxs int }{ - {10, nopPreFilter, nopPostFilter, 10}, - {10, PreCheckMaxBytes(10), nopPostFilter, 0}, - {10, PreCheckMaxBytes(22), nopPostFilter, 10}, - {10, nopPreFilter, PostCheckMaxGas(-1), 10}, - {10, nopPreFilter, PostCheckMaxGas(0), 0}, - {10, nopPreFilter, PostCheckMaxGas(1), 10}, - {10, nopPreFilter, PostCheckMaxGas(3000), 10}, - {10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0}, - {10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10}, - {10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10}, - {10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0}, + {10, nopPreFilter, 10}, + {10, PreCheckMaxBytes(10), 0}, + {10, PreCheckMaxBytes(20), 0}, + {10, PreCheckMaxBytes(22), 10}, + {10, PreCheckMaxBytes(30), 10}, } for tcIndex, tt := range tests { - err := mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) + err := mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter) require.NoError(t, err) checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) @@ -186,7 +178,7 @@ func TestMempoolUpdate(t *testing.T) { // 1. Adds valid txs to the cache { - err := mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + err := mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) if assert.Error(t, err) { @@ -198,7 +190,7 @@ func TestMempoolUpdate(t *testing.T) { { err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) require.NoError(t, err) - err = mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + err = mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) assert.Zero(t, mempool.Size()) } @@ -207,7 +199,7 @@ func TestMempoolUpdate(t *testing.T) { { err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) require.NoError(t, err) - err = mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) + err = mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil) require.NoError(t, err) assert.Zero(t, mempool.Size()) @@ -239,7 +231,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { _ = app.DeliverTx(abci.RequestDeliverTx{Tx: a}) _ = app.DeliverTx(abci.RequestDeliverTx{Tx: b}) err = mempool.Update(1, []types.Tx{a, b}, - []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil) + []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil) require.NoError(t, err) // a must be added to the cache @@ -294,7 +286,7 @@ func TestTxsAvailable(t *testing.T) { // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] - if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { + if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) @@ -306,7 +298,7 @@ func TestTxsAvailable(t *testing.T) { // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) //nolint: gocritic - if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { + if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -365,7 +357,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { + if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil { t.Error(err) } } @@ -536,7 +528,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 1, mempool.TxsBytes()) // 3. zero again after tx is removed by Update - err = mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + err = mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil) require.NoError(t, err) assert.EqualValues(t, 0, mempool.TxsBytes()) @@ -586,7 +578,7 @@ func TestMempoolTxsBytes(t *testing.T) { require.NotEmpty(t, res2.Data) // Pretend like we committed nothing so txBytes gets rechecked and removed. - err = mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) + err = mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil) require.NoError(t, err) assert.EqualValues(t, 0, mempool.TxsBytes()) diff --git a/mempool/mempool.go b/mempool/mempool.go index d01958b53..dca3f44e0 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -43,7 +43,6 @@ type Mempool interface { blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, - newPostFn PostCheckFunc, ) error // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are @@ -85,11 +84,6 @@ type Mempool interface { // transaction doesn't exceeded the block size. type PreCheckFunc func(types.Tx) error -// PostCheckFunc is an optional filter executed after CheckTx and rejects -// transaction if false is returned. An example would be to ensure a -// transaction doesn't require more gas than available for the block. -type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error - // TxInfo are parameters that get passed when attempting to add a tx to the // mempool. type TxInfo struct { @@ -114,22 +108,3 @@ func PreCheckMaxBytes(maxBytes int64) PreCheckFunc { return nil } } - -// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed -// maxGas. Returns nil if maxGas is -1. -func PostCheckMaxGas(maxGas int64) PostCheckFunc { - return func(tx types.Tx, res *abci.ResponseCheckTx) error { - if maxGas == -1 { - return nil - } - if res.GasWanted < 0 { - return fmt.Errorf("gas wanted %d is negative", - res.GasWanted) - } - if res.GasWanted > maxGas { - return fmt.Errorf("gas wanted %d is greater than max gas %d", - res.GasWanted, maxGas) - } - return nil - } -} diff --git a/mempool/metrics.go b/mempool/metrics.go index 5e4eaf5ed..39459de69 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -23,7 +23,9 @@ type Metrics struct { // Number of failed transactions. FailedTxs metrics.Counter // Number of times transactions are rechecked in the mempool. - RecheckTimes metrics.Counter + RecheckCount metrics.Counter + // Time of recheck transactions in the mempool. + RecheckTime metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -54,12 +56,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "failed_txs", Help: "Number of failed transactions.", }, labels).With(labelsAndValues...), - RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + RecheckCount: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "recheck_times", + Name: "recheck_count", Help: "Number of times transactions are rechecked in the mempool.", }, labels).With(labelsAndValues...), + RecheckTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "recheck_time", + Help: "Time of recheck transactions in the mempool in ms.", + }, labels).With(labelsAndValues...), } } @@ -69,6 +77,7 @@ func NopMetrics() *Metrics { Size: discard.NewGauge(), TxSizeBytes: discard.NewHistogram(), FailedTxs: discard.NewCounter(), - RecheckTimes: discard.NewCounter(), + RecheckCount: discard.NewCounter(), + RecheckTime: discard.NewGauge(), } } diff --git a/mempool/mock/mempool.go b/mempool/mock/mempool.go index be690efaa..9f1b8b1f8 100644 --- a/mempool/mock/mempool.go +++ b/mempool/mock/mempool.go @@ -25,7 +25,6 @@ func (Mempool) Update( _ types.Txs, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index bc51bfd9b..68e1223bb 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -102,7 +102,7 @@ func TestReactorConcurrency(t *testing.T) { for i := range txs { deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} } - err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil) + err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil) assert.NoError(t, err) }() @@ -114,7 +114,7 @@ func TestReactorConcurrency(t *testing.T) { reactors[1].mempool.Lock() defer reactors[1].mempool.Unlock() - err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) + err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil) assert.NoError(t, err) }() diff --git a/node/node.go b/node/node.go index b309f995b..84e82c156 100644 --- a/node/node.go +++ b/node/node.go @@ -322,7 +322,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempoolLogger := logger.With("module", "mempool") mempoolReactor := mempl.NewReactor(config.Mempool, mempool) @@ -1011,6 +1010,9 @@ func (n *Node) startRPC() ([]net.Listener, error) { config.MaxBodyBytes = n.config.RPC.MaxBodyBytes config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes config.MaxOpenConnections = n.config.RPC.MaxOpenConnections + config.ReadTimeout = n.config.RPC.ReadTimeout + config.WriteTimeout = n.config.RPC.WriteTimeout + config.IdleTimeout = n.config.RPC.IdleTimeout // If necessary adjust global WriteTimeout to ensure it's greater than // TimeoutBroadcastTxCommit. // See https://github.com/tendermint/tendermint/issues/3435 diff --git a/node/node_test.go b/node/node_test.go index 8cbf38601..91986753b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -248,7 +248,6 @@ func TestCreateProposalBlock(t *testing.T) { state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempool.SetLogger(logger) @@ -340,7 +339,6 @@ func TestMaxProposalBlockSize(t *testing.T) { state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempool.SetLogger(logger) diff --git a/rpc/jsonrpc/client/http_json_client.go b/rpc/jsonrpc/client/http_json_client.go index 59727390a..2e3822a3b 100644 --- a/rpc/jsonrpc/client/http_json_client.go +++ b/rpc/jsonrpc/client/http_json_client.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strings" + "time" tmsync "github.com/tendermint/tendermint/libs/sync" types "github.com/tendermint/tendermint/rpc/jsonrpc/types" @@ -21,6 +22,10 @@ const ( protoWSS = "wss" protoWS = "ws" protoTCP = "tcp" + + defaultMaxIdleConns = 10000 + defaultIdleConnTimeout = 60 // sec + defaultExpectContinueTimeout = 1 // sec ) //------------------------------------------------------------- @@ -369,8 +374,12 @@ func DefaultHTTPClient(remoteAddr string) (*http.Client, error) { client := &http.Client{ Transport: &http.Transport{ // Set to true to prevent GZIP-bomb DoS attacks - DisableCompression: true, - Dial: dialFn, + DisableCompression: true, + Dial: dialFn, + MaxIdleConns: defaultMaxIdleConns, + MaxIdleConnsPerHost: defaultMaxIdleConns, + IdleConnTimeout: defaultIdleConnTimeout * time.Second, + ExpectContinueTimeout: defaultExpectContinueTimeout * time.Second, }, } diff --git a/rpc/jsonrpc/server/http_server.go b/rpc/jsonrpc/server/http_server.go index 6799d3665..8b0f247ff 100644 --- a/rpc/jsonrpc/server/http_server.go +++ b/rpc/jsonrpc/server/http_server.go @@ -27,6 +27,8 @@ type Config struct { ReadTimeout time.Duration // mirrors http.Server#WriteTimeout WriteTimeout time.Duration + // mirrors http.Server#IdleTimeout + IdleTimeout time.Duration // MaxBodyBytes controls the maximum number of bytes the // server will read parsing the request body. MaxBodyBytes int64 @@ -40,6 +42,7 @@ func DefaultConfig() *Config { MaxOpenConnections: 0, // unlimited ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, + IdleTimeout: 60 * time.Second, MaxBodyBytes: int64(1000000), // 1MB MaxHeaderBytes: 1 << 20, // same as the net/http default } @@ -56,6 +59,7 @@ func Serve(listener net.Listener, handler http.Handler, logger log.Logger, confi Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger), ReadTimeout: config.ReadTimeout, WriteTimeout: config.WriteTimeout, + IdleTimeout: config.IdleTimeout, MaxHeaderBytes: config.MaxHeaderBytes, } err := s.Serve(listener) @@ -81,6 +85,7 @@ func ServeTLS( Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger), ReadTimeout: config.ReadTimeout, WriteTimeout: config.WriteTimeout, + IdleTimeout: config.IdleTimeout, MaxHeaderBytes: config.MaxHeaderBytes, } err := s.ServeTLS(listener, certFile, keyFile) diff --git a/state/execution.go b/state/execution.go index 2cc3db6e7..57a920f0a 100644 --- a/state/execution.go +++ b/state/execution.go @@ -136,12 +136,16 @@ func (blockExec *BlockExecutor) ApplyBlock( return state, 0, ErrInvalidBlock(err) } - startTime := time.Now().UnixNano() + execStartTime := time.Now().UnixNano() abciResponses, err := execBlockOnProxyApp( blockExec.logger, blockExec.proxyApp, block, blockExec.store, state.InitialHeight, ) - endTime := time.Now().UnixNano() - blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) + execEndTime := time.Now().UnixNano() + + execTimeMs := float64(execEndTime-execStartTime) / 1000000 + blockExec.metrics.BlockProcessingTime.Observe(execTimeMs) + blockExec.metrics.BlockExecutionTime.Set(execTimeMs) + if err != nil { return state, 0, ErrProxyAppConn(err) } @@ -177,7 +181,13 @@ func (blockExec *BlockExecutor) ApplyBlock( } // Lock mempool, commit app state, update mempoool. + commitStartTime := time.Now().UnixNano() appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs) + commitEndTime := time.Now().UnixNano() + + commitTimeMs := float64(commitEndTime-commitStartTime) / 1000000 + blockExec.metrics.BlockCommitTime.Set(commitTimeMs) + if err != nil { return state, 0, fmt.Errorf("commit failed for application: %v", err) } @@ -225,7 +235,13 @@ func (blockExec *BlockExecutor) Commit( } // Commit block, get hash back + appCommitStartTime := time.Now().UnixNano() res, err := blockExec.proxyApp.CommitSync() + appCommitEndTime := time.Now().UnixNano() + + appCommitTimeMs := float64(appCommitEndTime-appCommitStartTime) / 1000000 + blockExec.metrics.BlockAppCommitTime.Set(appCommitTimeMs) + if err != nil { blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err) return nil, 0, err @@ -240,13 +256,12 @@ func (blockExec *BlockExecutor) Commit( ) // Update mempool. - err = blockExec.mempool.Update( - block.Height, - block.Txs, - deliverTxResponses, - TxPreCheck(state), - TxPostCheck(state), - ) + updateMempoolStartTime := time.Now().UnixNano() + err = blockExec.mempool.Update(block.Height, block.Txs, deliverTxResponses, TxPreCheck(state)) + updateMempoolEndTime := time.Now().UnixNano() + + updateMempoolTimeMs := float64(updateMempoolEndTime-updateMempoolStartTime) / 1000000 + blockExec.metrics.BlockUpdateMempoolTime.Set(updateMempoolTimeMs) return res.Data, res.RetainHeight, err } diff --git a/state/metrics.go b/state/metrics.go index bcd713f5f..730de6169 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -17,6 +17,14 @@ const ( type Metrics struct { // Time between BeginBlock and EndBlock. BlockProcessingTime metrics.Histogram + // Time gauge between BeginBlock and EndBlock. + BlockExecutionTime metrics.Gauge + // Time of commit + BlockCommitTime metrics.Gauge + // Time of app commit + BlockAppCommitTime metrics.Gauge + // Time of update mempool + BlockUpdateMempoolTime metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -35,12 +43,40 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Time between BeginBlock and EndBlock in ms.", Buckets: stdprometheus.LinearBuckets(1, 10, 10), }, labels).With(labelsAndValues...), + BlockExecutionTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_execution_time", + Help: "Time between BeginBlock and EndBlock in ms.", + }, labels).With(labelsAndValues...), + BlockCommitTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_commit_time", + Help: "Time of commit in ms.", + }, labels).With(labelsAndValues...), + BlockAppCommitTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_app_commit_time", + Help: "Time of app commit in ms.", + }, labels).With(labelsAndValues...), + BlockUpdateMempoolTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_update_mempool_time", + Help: "Time of update mempool in ms.", + }, labels).With(labelsAndValues...), } } // NopMetrics returns no-op Metrics. func NopMetrics() *Metrics { return &Metrics{ - BlockProcessingTime: discard.NewHistogram(), + BlockProcessingTime: discard.NewHistogram(), + BlockExecutionTime: discard.NewGauge(), + BlockCommitTime: discard.NewGauge(), + BlockAppCommitTime: discard.NewGauge(), + BlockUpdateMempoolTime: discard.NewGauge(), } } diff --git a/state/tx_filter.go b/state/tx_filter.go index 52d055966..0a8e08d3c 100644 --- a/state/tx_filter.go +++ b/state/tx_filter.go @@ -14,9 +14,3 @@ func TxPreCheck(state State) mempl.PreCheckFunc { ) return mempl.PreCheckMaxBytes(maxDataBytes) } - -// TxPostCheck returns a function to filter transactions after processing. -// The function limits the gas wanted by a transaction to the block's maximum total gas. -func TxPostCheck(state State) mempl.PostCheckFunc { - return mempl.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas) -} diff --git a/test/maverick/consensus/replay_stubs.go b/test/maverick/consensus/replay_stubs.go index 08974a67e..087727fed 100644 --- a/test/maverick/consensus/replay_stubs.go +++ b/test/maverick/consensus/replay_stubs.go @@ -28,7 +28,6 @@ func (emptyMempool) Update( _ types.Txs, _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, - _ mempl.PostCheckFunc, ) error { return nil } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 22853de5f..dcb66484b 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -369,7 +369,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, state.LastBlockHeight, mempl.WithMetrics(memplMetrics), mempl.WithPreCheck(sm.TxPreCheck(state)), - mempl.WithPostCheck(sm.TxPostCheck(state)), ) mempoolLogger := logger.With("module", "mempool") mempoolReactor := mempl.NewReactor(config.Mempool, mempool)