Skip to content

Commit

Permalink
feat: concurrent recheckTx (#163) (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
egonspace committed Jul 8, 2021
1 parent 05becb8 commit 1221fe7
Show file tree
Hide file tree
Showing 27 changed files with 301 additions and 188 deletions.
27 changes: 20 additions & 7 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,24 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
)
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes {
req := types.ToRequestCheckTx(params)
reqRes := NewReqRes(req)

app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) {
res := types.ToResponseCheckTx(r)
app.Callback(req, res)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()

// Notify reqRes listener if set
if cb := reqRes.GetCallback(); cb != nil {
cb(res)
}
})

return reqRes
}

func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes {
Expand Down Expand Up @@ -251,7 +263,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
res := app.Application.CheckTxSync(req)
return &res, nil
}

Expand Down Expand Up @@ -355,6 +367,7 @@ func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRe
func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()
return reqRes
}
10 changes: 9 additions & 1 deletion abci/example/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
if app.serial {
if len(req.Tx) > 8 {
return types.ResponseCheckTx{
Expand Down
10 changes: 9 additions & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}

Expand Down
8 changes: 6 additions & 2 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t
return app.app.DeliverTx(req)
}

func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTx(req)
func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTxSync(req)
}

func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
app.app.CheckTxAsync(req, callback)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
Expand Down
2 changes: 1 addition & 1 deletion abci/server/socket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
res := s.app.DeliverTx(*r.DeliverTx)
responses <- types.ToResponseDeliverTx(res)
case *types.Request_CheckTx:
res := s.app.CheckTx(*r.CheckTx)
res := s.app.CheckTxSync(*r.CheckTx)
responses <- types.ToResponseCheckTx(res)
case *types.Request_Commit:
res := s.app.Commit()
Expand Down
51 changes: 29 additions & 22 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
context "golang.org/x/net/context"
)

type CheckTxCallback func(ResponseCheckTx)

// Application is an interface that enables any finite, deterministic state machine
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
Expand All @@ -15,7 +17,8 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

Expand Down Expand Up @@ -57,10 +60,22 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx {
return ResponseDeliverTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx {
func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) {
callback(ResponseCheckTx{Code: CodeTypeOK})
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx {
return ResponseEndRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) Commit() ResponseCommit {
return ResponseCommit{}
}
Expand All @@ -81,14 +96,6 @@ func (BaseApplication) EndBlock(req RequestEndBlock) ResponseEndBlock {
return ResponseEndBlock{}
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx {
return ResponseEndRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) ListSnapshots(req RequestListSnapshots) ResponseListSnapshots {
return ResponseListSnapshots{}
}
Expand Down Expand Up @@ -140,7 +147,18 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx
}

func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
res := app.app.CheckTx(*req)
res := app.app.CheckTxSync(*req)
return &res, nil
}

func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) (
*ResponseBeginRecheckTx, error) {
res := app.app.BeginRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) {
res := app.app.EndRecheckTx(*req)
return &res, nil
}

Expand Down Expand Up @@ -169,17 +187,6 @@ func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock)
return &res, nil
}

func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) (
*ResponseBeginRecheckTx, error) {
res := app.app.BeginRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) {
res := app.app.EndRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) ListSnapshots(
ctx context.Context, req *RequestListSnapshots) (*ResponseListSnapshots, error) {
res := app.app.ListSnapshots(*req)
Expand Down
6 changes: 5 additions & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}

func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}

func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(abci.ResponseCheckTx{})
}

func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
Expand Down
16 changes: 12 additions & 4 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func deliverTxsRange(cs *State, start, end int) {
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
_, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{})
if err != nil {
panic(fmt.Sprintf("Error after CheckTx: %v", err))
}
Expand Down Expand Up @@ -167,13 +167,13 @@ func TestMempoolRmBadTx(t *testing.T) {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
Expand Down Expand Up @@ -239,7 +239,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons
return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return app.checkTx(req)
}

func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
app.mempoolTxCountMtx.Lock()
defer app.mempoolTxCountMtx.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -548,7 +548,7 @@ func waitForAndValidateBlock(
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
_, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{})
assert.Nil(t, err)
}
}, css)
Expand Down
5 changes: 4 additions & 1 deletion consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ var _ mempl.Mempool = emptyMempool{}
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error {
func (emptyMempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
return nil, nil
}
func (emptyMempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error {
return nil
}
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
Expand Down
14 changes: 6 additions & 8 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func sendTxs(ctx context.Context, cs *State) {
return
default:
tx := []byte{byte(i)}
if err := assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{}); err != nil {
panic(err)
}
i++
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
valPubKey1ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey1)
assert.Nil(t, err)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
})

Expand All @@ -466,15 +466,13 @@ func TestSimulateValidatorsChange(t *testing.T) {
updatePubKey1ABCI, err := cryptoenc.PubKeyToProto(updateValidatorPubKey1)
require.NoError(t, err)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
})

// height 4
height++
incrementHeight(vss...)

// re-calculate vss
newVss := make([]*validatorStub, nVals+1)
copy(newVss, vss[:nVals+1])
sort.Sort(ValidatorStubsByPower(newVss))
Expand Down Expand Up @@ -504,14 +502,14 @@ func TestSimulateValidatorsChange(t *testing.T) {
newVal2ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey2)
require.NoError(t, err)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3)
require.NoError(t, err)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
})

Expand Down Expand Up @@ -545,7 +543,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newVal3ABCI, err := cryptoenc.PubKeyToProto(newValidatorPubKey3)
require.NoError(t, err)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
})

Expand Down
2 changes: 1 addition & 1 deletion consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2347,7 +2347,7 @@ func addValidator(cs *State, vssMap map[string]*validatorStub, height int64) {
panic("failed to convert newVal to protobuf")
}
newValidatorTx := kvstore.MakeValSetChangeTx(val.PubKey, 10)
_ = assertMempool(cs.txNotifier).CheckTx(newValidatorTx, nil, mempl.TxInfo{})
_, _ = assertMempool(cs.txNotifier).CheckTxSync(newValidatorTx, mempl.TxInfo{})
vssMap[newVal.PubKey.Address().String()] = newValidatorStub(privVal, int32(len(vssMap)+1))
vssMap[newVal.PubKey.Address().String()].Height = height
}
Expand Down
Loading

0 comments on commit 1221fe7

Please sign in to comment.