Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nodebuilder/state): Provide stubbed state module if a core endpoint not provided #2577

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions nodebuilder/core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ type Config struct {
// node's connection to a Celestia-Core endpoint.
func DefaultConfig() Config {
return Config{
IP: "0.0.0.0",
RPCPort: "0",
GRPCPort: "0",
IP: "",
RPCPort: "",
GRPCPort: "",
}
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
if !cfg.IsEndpointConfigured() {
return nil
}

ip, err := utils.ValidateAddr(cfg.IP)
if err != nil {
return err
Expand All @@ -41,3 +45,9 @@ func (cfg *Config) Validate() error {
}
return nil
}

// IsEndpointConfigured returns whether a core endpoint has been set
// on the config (true if set).
func (cfg *Config) IsEndpointConfigured() bool {
return cfg.IP != ""
}
8 changes: 8 additions & 0 deletions nodebuilder/fraud/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type ServiceBreaker[S service] struct {
// Start starts the inner service if there are no fraud proofs stored.
// Subscribes for fraud and stops the service whenever necessary.
func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error {
if breaker == nil {
return nil
}

proofs, err := breaker.FraudServ.Get(ctx, breaker.FraudType)
switch err {
default:
Expand All @@ -57,6 +61,10 @@ func (breaker *ServiceBreaker[S]) Start(ctx context.Context) error {

// Stop stops the service and cancels subscription.
func (breaker *ServiceBreaker[S]) Stop(ctx context.Context) error {
if breaker == nil {
return nil
}

if breaker.ctx.Err() != nil {
// short circuit if the service was already stopped
return nil
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store
fx.Supply(signer),
// modules provided by the node
p2p.ConstructModule(tp, &cfg.P2P),
state.ConstructModule(tp, &cfg.State),
state.ConstructModule(tp, &cfg.State, &cfg.Core),
header.ConstructModule(tp, &cfg.Header),
share.ConstructModule(tp, &cfg.Share),
rpc.ConstructModule(tp, &cfg.RPC),
Expand Down
10 changes: 10 additions & 0 deletions nodebuilder/node_light_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nodebuilder

import (
"context"
"crypto/rand"
"testing"

Expand All @@ -11,6 +12,7 @@ import (

nodebuilder "github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
)

func TestNewLightWithP2PKey(t *testing.T) {
Expand Down Expand Up @@ -44,3 +46,11 @@ func TestLight_WithNetwork(t *testing.T) {
require.NotNil(t, node)
assert.Equal(t, p2p.Private, node.Network)
}

// TestLight_WithStubbedCoreAccessor ensures that a node started without
// a core connection will return a stubbed StateModule.
func TestLight_WithStubbedCoreAccessor(t *testing.T) {
node := TestNode(t, nodebuilder.Light)
_, err := node.StateServ.Balance(context.Background())
assert.ErrorIs(t, state.ErrNoStateAccess, err)
}
12 changes: 0 additions & 12 deletions nodebuilder/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,8 @@ func TestLifecycle(t *testing.T) {
err := node.Start(ctx)
require.NoError(t, err)

// ensure the state service is running
require.False(t, node.StateServ.IsStopped(ctx))

err = node.Stop(ctx)
require.NoError(t, err)

// ensure the state service is stopped
require.True(t, node.StateServ.IsStopped(ctx))
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
Expand Down Expand Up @@ -96,14 +90,8 @@ func TestLifecycle_WithMetrics(t *testing.T) {
err := node.Start(ctx)
require.NoError(t, err)

// ensure the state service is running
require.False(t, node.StateServ.IsStopped(ctx))

err = node.Stop(ctx)
require.NoError(t, err)

// ensure the state service is stopped
require.True(t, node.StateServ.IsStopped(ctx))
})
}
}
Expand Down
7 changes: 6 additions & 1 deletion nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
baseComponents := fx.Options(
fx.Supply(metricOpts),
fx.Invoke(initializeMetrics),
fx.Invoke(state.WithMetrics),
fx.Invoke(func(ca *state.CoreAccessor) {
if ca == nil {
return
}
state.WithMetrics(ca)
}),
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
fx.Invoke(fraud.WithMetrics),
fx.Invoke(node.WithMetrics),
fx.Invoke(modheader.WithMetrics),
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/state/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func coreAccessor(
signer *apptypes.KeyringSigner,
sync *sync.Syncer[*header.ExtendedHeader],
fraudServ libfraud.Service,
) (*state.CoreAccessor, *modfraud.ServiceBreaker[*state.CoreAccessor]) {
) (*state.CoreAccessor, Module, *modfraud.ServiceBreaker[*state.CoreAccessor]) {
ca := state.NewCoreAccessor(signer, sync, corecfg.IP, corecfg.RPCPort, corecfg.GRPCPort)

return ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{
return ca, ca, &modfraud.ServiceBreaker[*state.CoreAccessor]{
Service: ca,
FraudType: byzantine.BadEncoding,
FraudServ: fraudServ,
Expand Down
11 changes: 6 additions & 5 deletions nodebuilder/state/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/state"
Expand All @@ -15,14 +17,14 @@ var log = logging.Logger("module/state")

// ConstructModule provides all components necessary to construct the
// state service.
func ConstructModule(tp node.Type, cfg *Config) fx.Option {
func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()

baseComponents := fx.Options(
fx.Supply(*cfg),
fx.Error(cfgErr),
fx.Provide(fx.Annotate(
fxutil.ProvideIf(coreCfg.IsEndpointConfigured(), fx.Annotate(
coreAccessor,
fx.OnStart(func(ctx context.Context, breaker *modfraud.ServiceBreaker[*state.CoreAccessor]) error {
return breaker.Start(ctx)
Expand All @@ -31,9 +33,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return breaker.Stop(ctx)
}),
)),
// the module is needed for the handler
fx.Provide(func(ca *state.CoreAccessor) Module {
return ca
fxutil.ProvideIf(!coreCfg.IsEndpointConfigured(), func() (*state.CoreAccessor, Module) {
return nil, &stubbedStateModule{}
}),
)

Expand Down
116 changes: 116 additions & 0 deletions nodebuilder/state/stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package state

import (
"context"
"errors"

"github.com/cosmos/cosmos-sdk/x/staking/types"

"github.com/celestiaorg/celestia-node/blob"
"github.com/celestiaorg/celestia-node/state"
)

var ErrNoStateAccess = errors.New("node is running without state access")

// stubbedStateModule provides a stub for the state module to return
// errors when state endpoints are accessed without a running connection
// to a core endpoint.
type stubbedStateModule struct{}

func (s stubbedStateModule) IsStopped(context.Context) bool {
return true
}

func (s stubbedStateModule) AccountAddress(context.Context) (state.Address, error) {
return state.Address{}, ErrNoStateAccess
}

func (s stubbedStateModule) Balance(context.Context) (*state.Balance, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) BalanceForAddress(
context.Context,
state.Address,
) (*state.Balance, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) Transfer(
_ context.Context,
_ state.AccAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) SubmitTx(context.Context, state.Tx) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) SubmitPayForBlob(
context.Context,
state.Int,
uint64,
[]*blob.Blob,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) CancelUnbondingDelegation(
_ context.Context,
_ state.ValAddress,
_, _, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) BeginRedelegate(
_ context.Context,
_, _ state.ValAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) Undelegate(
_ context.Context,
_ state.ValAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) Delegate(
_ context.Context,
_ state.ValAddress,
_, _ state.Int,
_ uint64,
) (*state.TxResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) QueryDelegation(
context.Context,
state.ValAddress,
) (*types.QueryDelegationResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) QueryUnbonding(
context.Context,
state.ValAddress,
) (*types.QueryUnbondingDelegationResponse, error) {
return nil, ErrNoStateAccess
}

func (s stubbedStateModule) QueryRedelegations(
_ context.Context,
_, _ state.ValAddress,
) (*types.QueryRedelegationsResponse, error) {
return nil, ErrNoStateAccess
}
6 changes: 5 additions & 1 deletion state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {

// dial given celestia-core endpoint
endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort)
client, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
client, err := grpc.DialContext(
ctx,
endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return err
}
Expand Down