Skip to content

Commit

Permalink
Streamline start.go
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Dec 12, 2024
1 parent 7d4d99b commit 19c089f
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 214 deletions.
219 changes: 76 additions & 143 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ import (
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
zetaos "github.com/zeta-chain/node/pkg/os"
Expand All @@ -27,7 +24,6 @@ import (
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/orchestrator"
zetatss "github.com/zeta-chain/node/zetaclient/tss"
"github.com/zeta-chain/node/zetaclient/zetacore"
)

const (
Expand All @@ -36,119 +32,80 @@ const (
envPprofAddr = "PPROF_ADDR"
)

// Start starts zetaclientd process todo revamp
// https://github.com/zeta-chain/node/issues/3112
// Start starts zetaclientd process
func Start(_ *cobra.Command, _ []string) error {
// Prompt for Hotkey, TSS key-share and relayer key passwords
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}
passwords, err := zetaos.PromptPasswords(titles)
// Load Config file given path
cfg, err := config.Load(globalOpts.ZetacoreHome)
if err != nil {
return errors.Wrap(err, "unable to get passwords")
}
hotkeyPass, tssKeyPass, solanaKeyPass := passwords[0], passwords[1], passwords[2]
relayerKeyPasswords := map[string]string{
chains.Network_solana.String(): solanaKeyPass,
return errors.Wrap(err, "unable to load config")
}

// Load Config file given path
cfg, err := config.Load(globalOpts.ZetacoreHome)
dbPath, err := config.ResolveDBPath()
if err != nil {
return err
return errors.Wrap(err, "unable to resolve db path")
}

logger, err := base.InitLogger(cfg)
// Configure logger (also overrides the default log level)
logger, err := base.NewLogger(cfg)
if err != nil {
return errors.Wrap(err, "initLogger failed")
return errors.Wrap(err, "unable to create logger")
}

masterLogger := logger.Std
startLogger := logger.Std.With().Str("module", "startup").Logger()
passes, err := promptPasswords()
if err != nil {
return errors.Wrap(err, "unable to prompt for passwords")
}

appContext := zctx.New(cfg, relayerKeyPasswords, masterLogger)
appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std)
ctx := zctx.WithAppContext(context.Background(), appContext)

// Wait until zetacore is up
waitForZetaCore(cfg, startLogger)
startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer)

// TODO graceful
telemetryServer := metrics.NewTelemetryServer()
go func() {
err := telemetryServer.Start()
if err != nil {
startLogger.Error().Err(err).Msg("telemetryServer error")
panic("telemetryServer error")
log.Fatal().Err(err).Msg("telemetryServer error")
}
}()

go runPprof(startLogger)

// CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to.
// Zetacore accumulates votes , and provides a centralized source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger)
m, err := metrics.NewMetrics()
if err != nil {
return errors.Wrap(err, "unable to create zetacore client")
return errors.Wrap(err, "unable to create metrics")
}
m.Start()

// Wait until zetacore is ready to create blocks
if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
}
startLogger.Info().Msgf("Zetacore client is ready")
metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

// Set grantee account number and sequence number
err = zetacoreClient.SetAccountNumber(authz.ZetaClientGranteeKey)
if err != nil {
startLogger.Error().Err(err).Msg("SetAccountNumber error")
return err
}
telemetryServer.SetIPAddress(cfg.PublicIP)

// cross-check chainid
res, err := zetacoreClient.GetNodeInfo(ctx)
// TODO graceful
go runPprof(logger.Std)

// zetacore client is used for all communication to zeta node.
// it accumulates votes, and provides a source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, passes.hotkey, logger.Std)
if err != nil {
startLogger.Error().Err(err).Msg("GetNodeInfo error")
return err
return errors.Wrap(err, "unable to create zetacore client")
}

if strings.Compare(res.GetDefaultNodeInfo().Network, cfg.ChainID) != 0 {
startLogger.Warn().
Msgf("chain id mismatch, zetacore chain id %s, zetaclient configured chain id %s; reset zetaclient chain id", res.GetDefaultNodeInfo().Network, cfg.ChainID)
cfg.ChainID = res.GetDefaultNodeInfo().Network
err := zetacoreClient.UpdateChainID(cfg.ChainID)
if err != nil {
return err
}
// Wait until zetacore is ready to produce blocks
if err = waitForBlocks(ctx, zetacoreClient, logger.Std); err != nil {
return errors.Wrap(err, "zetacore unavailable")
}

// CreateAuthzSigner : which is used to sign all authz messages . All votes broadcast to zetacore are wrapped in authz exec .
// This is to ensure that the user does not need to keep their operator key online , and can use a cold key to sign votes
signerAddress, err := zetacoreClient.GetKeys().GetAddress()
if err != nil {
return errors.Wrap(err, "error getting signer address")
if err = prepareZetacoreClient(ctx, zetacoreClient, &cfg, logger.Std); err != nil {
return errors.Wrap(err, "unable to prepare zetacore client")
}

createAuthzSigner(zetacoreClient.GetKeys().GetOperatorAddress().String(), signerAddress)
startLogger.Debug().Msgf("createAuthzSigner is ready")

// Initialize core parameters from zetacore
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil {
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, logger.Std); err != nil {
return errors.Wrap(err, "unable to update app context")
}

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())
log.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

m, err := metrics.NewMetrics()
if err != nil {
return errors.Wrap(err, "unable to create metrics")
}
m.Start()

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

telemetryServer.SetIPAddress(cfg.PublicIP)

granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass)
granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, passes.hotkey)
if err != nil {
return errors.Wrap(err, "unable to resolve observer pub key bech32")
}
Expand All @@ -157,44 +114,30 @@ func Start(_ *cobra.Command, _ []string) error {
Config: cfg,
Zetacore: zetacoreClient,
GranteePubKeyBech32: granteePubKeyBech32,
HotKeyPassword: hotkeyPass,
TSSKeyPassword: tssKeyPass,
HotKeyPassword: passes.hotkey,
TSSKeyPassword: passes.tss,
BitcoinChainIDs: btcChainIDsFromContext(appContext),
PostBlame: isEnvFlagEnabled(envFlagPostBlame),
Telemetry: telemetryServer,
}

tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger)
tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std)
if err != nil {
return errors.Wrap(err, "unable to setup TSS service")
}

// Creating a channel to listen for os signals (or other signals)
// TODO graceful
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() {
logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

if len(appContext.ListChainIDs()) == 0 {
startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config")
}

isObserver, err := isObserverNode(ctx, zetacoreClient)
switch {
case err != nil:
startLogger.Error().Msgf("Unable to determine if node is an observer")
return err
case !isObserver:
addr := zetacoreClient.GetKeys().GetOperatorAddress().String()
startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0")
return nil
}

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
Expand All @@ -203,16 +146,9 @@ func Start(_ *cobra.Command, _ []string) error {
return err
}

userDir, err := os.UserHomeDir()
if err != nil {
log.Error().Err(err).Msg("os.UserHomeDir")
return err
}
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetryServer)
if err != nil {
return errors.Wrap(err, "unable to create chain observer map")
}
Expand All @@ -226,7 +162,7 @@ func Start(_ *cobra.Command, _ []string) error {
signerMap,
observerMap,
tss,
dbpath,
dbPath,
logger,
telemetryServer,
)
Expand All @@ -239,48 +175,17 @@ func Start(_ *cobra.Command, _ []string) error {
return errors.Wrap(err, "unable to start orchestrator")
}

// start zeta supply checker
// TODO: enable
// https://github.com/zeta-chain/node/issues/1354
// NOTE: this is disabled for now because we need to determine the frequency on how to handle invalid check
// The method uses GRPC query to the node we might need to improve for performance
//zetaSupplyChecker, err := mc.NewZetaSupplyChecker(cfg, zetacoreClient, masterLogger)
//if err != nil {
// startLogger.Err(err).Msg("NewZetaSupplyChecker")
//}
//if err == nil {
// zetaSupplyChecker.Start()
// defer zetaSupplyChecker.Stop()
//}

startLogger.Info().Msg("zetaclientd is running")
log.Info().Msg("zetaclientd is running")

// todo graceful
sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)
log.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)

maestro.Stop()

return nil
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
if err != nil {
return false, errors.Wrap(err, "unable to get observers list")
}

operatorAddress := client.GetKeys().GetOperatorAddress().String()

for _, observer := range observers {
if observer == operatorAddress {
return true, nil
}
}

return false, nil
}

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
Expand All @@ -307,3 +212,31 @@ func runPprof(logger zerolog.Logger) {
logger.Error().Err(err).Msg("pprof http server error")
}
}

type passwords struct {
hotkey string
tss string
solanaRelayerKey string
}

// promptPasswords prompts for Hotkey, TSS key-share and relayer key passwords
func promptPasswords() (passwords, error) {
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}

res, err := zetaos.PromptPasswords(titles)
if err != nil {
return passwords{}, errors.Wrap(err, "unable to get passwords")
}

return passwords{
hotkey: res[0],
tss: res[1],
solanaRelayerKey: res[2],
}, nil
}

func (p passwords) relayerKeys() map[string]string {
return map[string]string{
chains.Network_solana.String(): p.solanaRelayerKey,
}
}
Loading

0 comments on commit 19c089f

Please sign in to comment.