Skip to content

Commit

Permalink
Use proxyApp instead of abciclient.Client in node clients (cosmos#779)
Browse files Browse the repository at this point in the history
Closes: cosmos#744

---------

Co-authored-by: Ganesha Upadhyaya <[email protected]>
  • Loading branch information
Ferret-san and Ganesha Upadhyaya authored Apr 10, 2023
1 parent 42e7dcf commit a473a04
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 44 deletions.
27 changes: 17 additions & 10 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"go.uber.org/multierr"

abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
llcfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
corep2p "github.com/tendermint/tendermint/p2p"
proxy "github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/rollkit/rollkit/block"
Expand Down Expand Up @@ -53,8 +53,8 @@ var _ Node = &FullNode{}
// It connects all the components and orchestrates their work.
type FullNode struct {
service.BaseService
eventBus *tmtypes.EventBus
appClient abciclient.Client
eventBus *tmtypes.EventBus
proxyApp proxy.AppConns

genesis *tmtypes.GenesisDoc
// cache of chunked genesis data.
Expand Down Expand Up @@ -94,18 +94,24 @@ func newFullNode(
conf config.NodeConfig,
p2pKey crypto.PrivKey,
signingKey crypto.PrivKey,
appClient abciclient.Client,
clientCreator proxy.ClientCreator,
genesis *tmtypes.GenesisDoc,
logger log.Logger,
) (*FullNode, error) {
proxyApp := proxy.NewAppConns(clientCreator)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}

eventBus := tmtypes.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}

var baseKV ds.TxnDatastore
var err error
var baseKV ds.TxnDatastore
if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing
logger.Info("WARNING: working in in-memory mode")
baseKV, err = store.NewDefaultInMemoryKVStore()
Expand Down Expand Up @@ -140,12 +146,12 @@ func newFullNode(
return nil, err
}

mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), appClient, 0)
mp := mempoolv1.NewTxMempool(logger, llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
mpIDs := newMempoolIDs()
mp.EnableTxsAvailable()

doneBuildingChannel := make(chan struct{})
blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, appClient, dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel)
blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel)
if err != nil {
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}
Expand All @@ -158,7 +164,7 @@ func newFullNode(
ctx, cancel := context.WithCancel(ctx)

node := &FullNode{
appClient: appClient,
proxyApp: proxyApp,
eventBus: eventBus,
genesis: genesis,
conf: conf,
Expand Down Expand Up @@ -249,6 +255,7 @@ func (n *FullNode) fraudProofPublishLoop(ctx context.Context) {

// OnStart is a part of Service interface.
func (n *FullNode) OnStart() error {

n.Logger.Info("starting P2P client")
err := n.P2P.Start(n.ctx)
if err != nil {
Expand Down Expand Up @@ -319,8 +326,8 @@ func (n *FullNode) EventBus() *tmtypes.EventBus {
}

// AppClient returns ABCI proxy connections to communicate with application.
func (n *FullNode) AppClient() abciclient.Client {
return n.appClient
func (n *FullNode) AppClient() proxy.AppConns {
return n.proxyApp
}

// newTxValidator creates a pubsub validator that uses the node's mempool to check the
Expand Down
9 changes: 4 additions & 5 deletions node/full_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sort"
"time"

abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
Expand Down Expand Up @@ -67,7 +66,7 @@ func (n *FullNode) GetClient() rpcclient.Client {

// ABCIInfo returns basic information about application state.
func (c *FullClient) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) {
resInfo, err := c.appClient().InfoSync(proxy.RequestInfo)
resInfo, err := c.appClient().Query().InfoSync(proxy.RequestInfo)
if err != nil {
return nil, err
}
Expand All @@ -81,7 +80,7 @@ func (c *FullClient) ABCIQuery(ctx context.Context, path string, data tmbytes.He

// ABCIQueryWithOptions queries for data from application.
func (c *FullClient) ABCIQueryWithOptions(ctx context.Context, path string, data tmbytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
resQuery, err := c.appClient().QuerySync(abci.RequestQuery{
resQuery, err := c.appClient().Query().QuerySync(abci.RequestQuery{
Path: path,
Data: data,
Height: opts.Height,
Expand Down Expand Up @@ -788,7 +787,7 @@ func (c *FullClient) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*ctypes
//
// If valid, the tx is automatically added to the mempool.
func (c *FullClient) CheckTx(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultCheckTx, error) {
res, err := c.appClient().CheckTxSync(abci.RequestCheckTx{Tx: tx})
res, err := c.appClient().Mempool().CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -844,7 +843,7 @@ func (c *FullClient) resubscribe(subscriber string, q tmpubsub.Query) tmtypes.Su
}
}

func (c *FullClient) appClient() abcicli.Client {
func (c *FullClient) appClient() proxy.AppConns {
return c.node.AppClient()
}

Expand Down
18 changes: 9 additions & 9 deletions node/full_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
tconfig "github.com/tendermint/tendermint/config"
tmcrypto "github.com/tendermint/tendermint/crypto"
Expand All @@ -25,6 +24,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"

Expand Down Expand Up @@ -119,7 +119,7 @@ func TestGenesisChunked(t *testing.T) {
mockApp.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{})
privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, abcicli.NewLocalClient(nil, mockApp), genDoc, log.TestingLogger())
n, _ := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, log.TestingLogger())

rpc := NewFullClient(n)

Expand Down Expand Up @@ -435,7 +435,7 @@ func TestTx(t *testing.T) {
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 1 * time.Second, // blocks must be at least 1 sec apart for adjacent headers to get verified correctly
}},
key, signingKey, abcicli.NewLocalClient(nil, mockApp),
key, signingKey, proxy.NewLocalClientCreator(mockApp),
&tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
log.TestingLogger())
require.NoError(err)
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestValidatorSetHandling(t *testing.T) {
},
signingKey,
signingKey,
abcicli.NewLocalClient(nil, apps[i]),
proxy.NewLocalClientCreator(apps[i]),
&tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
log.TestingLogger(),
)
Expand Down Expand Up @@ -858,7 +858,7 @@ func getRPC(t *testing.T) (*mocks.Application, *FullClient) {
app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{})
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)

Expand Down Expand Up @@ -939,7 +939,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 1 * time.Second,
},
}, key1, signingKey1, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
}, key1, signingKey1, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node1)

Expand All @@ -949,7 +949,7 @@ func TestMempool2Nodes(t *testing.T) {
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
Seeds: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.Pretty(),
},
}, key2, signingKey2, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
}, key2, signingKey2, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node1)

Expand Down Expand Up @@ -1034,7 +1034,7 @@ func TestStatus(t *testing.T) {
},
key,
signingKey,
abcicli.NewLocalClient(nil, app),
proxy.NewLocalClientCreator(app),
&tmtypes.GenesisDoc{
ChainID: "test",
Validators: genesisValidators,
Expand Down Expand Up @@ -1134,7 +1134,7 @@ func TestFutureGenesisTime(t *testing.T) {
BlockTime: 200 * time.Millisecond,
}},
key, signingKey,
abcicli.NewLocalClient(nil, mockApp),
proxy.NewLocalClientCreator(mockApp),
&tmtypes.GenesisDoc{
ChainID: "test",
InitialHeight: 1,
Expand Down
8 changes: 4 additions & 4 deletions node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/rollkit/rollkit/config"
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestAggregatorMode(t *testing.T) {
BlockTime: 1 * time.Second,
NamespaceID: types.NamespaceID{1, 2, 3, 4, 5, 6, 7, 8},
}
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: blockManagerConfig}, key, signingKey, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)

Expand Down Expand Up @@ -157,7 +157,7 @@ func TestLazyAggregator(t *testing.T) {
Aggregator: true,
BlockManagerConfig: blockManagerConfig,
LazyAggregator: true,
}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
}, key, signingKey, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
assert.False(node.IsRunning())
assert.NoError(err)
err = node.Start()
Expand Down Expand Up @@ -545,7 +545,7 @@ func createNode(ctx context.Context, n int, isMalicious bool, aggregator bool, d
},
keys[n],
signingKey,
abcicli.NewLocalClient(nil, app),
proxy.NewLocalClientCreator(app),
&tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
log.TestingLogger().With("node", n))
require.NoError(err)
Expand Down
6 changes: 3 additions & 3 deletions node/full_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
abcicli "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"

"github.com/rollkit/rollkit/config"
Expand All @@ -31,7 +31,7 @@ func TestStartup(t *testing.T) {
app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{})
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger())
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)

Expand All @@ -57,7 +57,7 @@ func TestMempoolDirectly(t *testing.T) {
signingKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
anotherKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)

node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, abcicli.NewLocalClient(nil, app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger())
node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, signingKey, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)

Expand Down
7 changes: 3 additions & 4 deletions node/header_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"github.com/celestiaorg/go-header"
goheaderp2p "github.com/celestiaorg/go-header/p2p"
goheaderstore "github.com/celestiaorg/go-header/store"
"github.com/celestiaorg/go-header/sync"
goheadersync "github.com/celestiaorg/go-header/sync"
sync "github.com/celestiaorg/go-header/sync"
ds "github.com/ipfs/go-datastore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -145,7 +144,7 @@ func (hExService *HeaderExchangeService) Start() error {
return fmt.Errorf("error while starting exchange: %w", err)
}

if hExService.syncer, err = newSyncer(hExService.ex, hExService.headerStore, hExService.sub, goheadersync.WithBlockTime(hExService.conf.BlockTime)); err != nil {
if hExService.syncer, err = newSyncer(hExService.ex, hExService.headerStore, hExService.sub, sync.WithBlockTime(hExService.conf.BlockTime)); err != nil {
return err
}

Expand Down Expand Up @@ -229,7 +228,7 @@ func newSyncer(
ex header.Exchange[*types.SignedHeader],
store header.Store[*types.SignedHeader],
sub header.Subscriber[*types.SignedHeader],
opt goheadersync.Options,
opt sync.Options,
) (*sync.Syncer[*types.SignedHeader], error) {
return sync.NewSyncer[*types.SignedHeader](ex, store, sub, opt)
}
17 changes: 12 additions & 5 deletions node/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/crypto"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
proxy "github.com/tendermint/tendermint/proxy"
rpcclient "github.com/tendermint/tendermint/rpc/client"
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/multierr"
Expand All @@ -26,7 +26,7 @@ type LightNode struct {

P2P *p2p.Client

app abciclient.Client
proxyApp proxy.AppConns

hExService *HeaderExchangeService

Expand All @@ -42,10 +42,17 @@ func newLightNode(
ctx context.Context,
conf config.NodeConfig,
p2pKey crypto.PrivKey,
appClient abciclient.Client,
clientCreator proxy.ClientCreator,
genesis *tmtypes.GenesisDoc,
logger log.Logger,
) (*LightNode, error) {
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp := proxy.NewAppConns(clientCreator)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}

datastore, err := openDatastore(conf, logger)
if err != nil {
return nil, err
Expand All @@ -64,7 +71,7 @@ func newLightNode(

node := &LightNode{
P2P: client,
app: appClient,
proxyApp: proxyApp,
hExService: headerExchangeService,
cancel: cancel,
ctx: ctx,
Expand Down Expand Up @@ -124,7 +131,7 @@ func (ln *LightNode) newFraudProofValidator() p2p.GossipValidator {
return false
}

resp, err := ln.app.VerifyFraudProofSync(abci.RequestVerifyFraudProof{
resp, err := ln.proxyApp.Consensus().VerifyFraudProofSync(abci.RequestVerifyFraudProof{
FraudProof: &fraudProof,
ExpectedValidAppHash: fraudProof.ExpectedValidAppHash,
})
Expand Down
Loading

0 comments on commit a473a04

Please sign in to comment.