Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(zetaclient): streamline process initialization #3291

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
## Refactor

* [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient
* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown)

### Fixes

Expand Down
273 changes: 93 additions & 180 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,22 @@ import (
"net/http"
_ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/zeta-chain/node/pkg/authz"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/graceful"
zetaos "github.com/zeta-chain/node/pkg/os"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/config"
zctx "github.com/zeta-chain/node/zetaclient/context"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/maintenance"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/orchestrator"
zetatss "github.com/zeta-chain/node/zetaclient/tss"
"github.com/zeta-chain/node/zetaclient/zetacore"
)

const (
Expand All @@ -36,119 +29,62 @@ const (
envPprofAddr = "PPROF_ADDR"
)

// Start starts zetaclientd process todo revamp
// https://github.com/zeta-chain/node/issues/3112
// Start starts zetaclientd process
func Start(_ *cobra.Command, _ []string) error {
// Prompt for Hotkey, TSS key-share and relayer key passwords
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}
passwords, err := zetaos.PromptPasswords(titles)
if err != nil {
return errors.Wrap(err, "unable to get passwords")
}
hotkeyPass, tssKeyPass, solanaKeyPass := passwords[0], passwords[1], passwords[2]
relayerKeyPasswords := map[string]string{
chains.Network_solana.String(): solanaKeyPass,
}

// Load Config file given path
cfg, err := config.Load(globalOpts.ZetacoreHome)
if err != nil {
return err
return errors.Wrap(err, "unable to load config")
}

logger, err := base.InitLogger(cfg)
dbPath, err := config.ResolveDBPath()
if err != nil {
return errors.Wrap(err, "initLogger failed")
return errors.Wrap(err, "unable to resolve db path")
}

masterLogger := logger.Std
startLogger := logger.Std.With().Str("module", "startup").Logger()

appContext := zctx.New(cfg, relayerKeyPasswords, masterLogger)
ctx := zctx.WithAppContext(context.Background(), appContext)

// Wait until zetacore is up
waitForZetaCore(cfg, startLogger)
startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer)

telemetryServer := metrics.NewTelemetryServer()
go func() {
err := telemetryServer.Start()
if err != nil {
startLogger.Error().Err(err).Msg("telemetryServer error")
panic("telemetryServer error")
}
}()

go runPprof(startLogger)

// CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to.
// Zetacore accumulates votes , and provides a centralized source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger)
// Configure logger (also overrides the default log level)
logger, err := base.NewLogger(cfg)
if err != nil {
return errors.Wrap(err, "unable to create zetacore client")
return errors.Wrap(err, "unable to create logger")
}

// Wait until zetacore is ready to create blocks
if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
passes, err := promptPasswords()
if err != nil {
return errors.Wrap(err, "unable to prompt for passwords")
}
startLogger.Info().Msgf("Zetacore client is ready")

// Set grantee account number and sequence number
err = zetacoreClient.SetAccountNumber(authz.ZetaClientGranteeKey)
appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std)
ctx := zctx.WithAppContext(context.Background(), appContext)

telemetry, err := startTelemetry(ctx, cfg)
if err != nil {
startLogger.Error().Err(err).Msg("SetAccountNumber error")
return err
return errors.Wrap(err, "unable to start telemetry")
}

// cross-check chainid
res, err := zetacoreClient.GetNodeInfo(ctx)
// zetacore client is used for all communication to zeta node.
// it accumulates votes, and provides a source of truth for all clients
zetacoreClient, err := createZetacoreClient(cfg, passes.hotkey, logger.Std)
if err != nil {
startLogger.Error().Err(err).Msg("GetNodeInfo error")
return err
return errors.Wrap(err, "unable to create zetacore client")
}

if strings.Compare(res.GetDefaultNodeInfo().Network, cfg.ChainID) != 0 {
startLogger.Warn().
Msgf("chain id mismatch, zetacore chain id %s, zetaclient configured chain id %s; reset zetaclient chain id", res.GetDefaultNodeInfo().Network, cfg.ChainID)
cfg.ChainID = res.GetDefaultNodeInfo().Network
err := zetacoreClient.UpdateChainID(cfg.ChainID)
if err != nil {
return err
}
// Wait until zetacore is ready to produce blocks
if err = waitForBlocks(ctx, zetacoreClient, logger.Std); err != nil {
return errors.Wrap(err, "zetacore unavailable")
}

// CreateAuthzSigner : which is used to sign all authz messages . All votes broadcast to zetacore are wrapped in authz exec .
// This is to ensure that the user does not need to keep their operator key online , and can use a cold key to sign votes
signerAddress, err := zetacoreClient.GetKeys().GetAddress()
if err != nil {
return errors.Wrap(err, "error getting signer address")
if err = prepareZetacoreClient(ctx, zetacoreClient, &cfg, logger.Std); err != nil {
return errors.Wrap(err, "unable to prepare zetacore client")
}

createAuthzSigner(zetacoreClient.GetKeys().GetOperatorAddress().String(), signerAddress)
startLogger.Debug().Msgf("createAuthzSigner is ready")

// Initialize core parameters from zetacore
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil {
if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, logger.Std); err != nil {
return errors.Wrap(err, "unable to update app context")
}

startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())
log.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked())

m, err := metrics.NewMetrics()
if err != nil {
return errors.Wrap(err, "unable to create metrics")
}
m.Start()

metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

telemetryServer.SetIPAddress(cfg.PublicIP)

granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass)
granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, passes.hotkey)
if err != nil {
return errors.Wrap(err, "unable to resolve observer pub key bech32")
}
Expand All @@ -157,44 +93,25 @@ func Start(_ *cobra.Command, _ []string) error {
Config: cfg,
Zetacore: zetacoreClient,
GranteePubKeyBech32: granteePubKeyBech32,
HotKeyPassword: hotkeyPass,
TSSKeyPassword: tssKeyPass,
HotKeyPassword: passes.hotkey,
TSSKeyPassword: passes.tss,
BitcoinChainIDs: btcChainIDsFromContext(appContext),
PostBlame: isEnvFlagEnabled(envFlagPostBlame),
Telemetry: telemetryServer,
Telemetry: telemetry,
}

tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger)
tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std)
if err != nil {
return errors.Wrap(err, "unable to setup TSS service")
}

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Starts various background TSS listeners.
// Shuts down zetaclientd if any is triggered.
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() {
logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
graceful.ShutdownNow()
})

if len(appContext.ListChainIDs()) == 0 {
startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config")
}

isObserver, err := isObserverNode(ctx, zetacoreClient)
switch {
case err != nil:
startLogger.Error().Msgf("Unable to determine if node is an observer")
return err
case !isObserver:
addr := zetacoreClient.GetKeys().GetOperatorAddress().String()
startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0")
return nil
}

// CreateSignerMap: This creates a map of all signers for each chain.
// Each signer is responsible for signing transactions for a particular chain
signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger)
Expand All @@ -203,16 +120,9 @@ func Start(_ *cobra.Command, _ []string) error {
return err
}

userDir, err := os.UserHomeDir()
if err != nil {
log.Error().Err(err).Msg("os.UserHomeDir")
return err
}
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetry)
if err != nil {
return errors.Wrap(err, "unable to create chain observer map")
}
Expand All @@ -226,84 +136,87 @@ func Start(_ *cobra.Command, _ []string) error {
signerMap,
observerMap,
tss,
dbpath,
dbPath,
logger,
telemetryServer,
telemetry,
)
if err != nil {
return errors.Wrap(err, "unable to create orchestrator")
}

// Start orchestrator with all observers and signers
if err = maestro.Start(ctx); err != nil {
return errors.Wrap(err, "unable to start orchestrator")
}
graceful.AddService(ctx, maestro)

// start zeta supply checker
// TODO: enable
// https://github.com/zeta-chain/node/issues/1354
// NOTE: this is disabled for now because we need to determine the frequency on how to handle invalid check
// The method uses GRPC query to the node we might need to improve for performance
//zetaSupplyChecker, err := mc.NewZetaSupplyChecker(cfg, zetacoreClient, masterLogger)
//if err != nil {
// startLogger.Err(err).Msg("NewZetaSupplyChecker")
//}
//if err == nil {
// zetaSupplyChecker.Start()
// defer zetaSupplyChecker.Stop()
//}
// Block current routine until a shutdown signal is received
graceful.WaitForShutdown()

startLogger.Info().Msg("zetaclientd is running")
return nil
}

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig)
type passwords struct {
hotkey string
tss string
solanaRelayerKey string
}

maestro.Stop()
// promptPasswords prompts for Hotkey, TSS key-share and relayer key passwords
func promptPasswords() (passwords, error) {
titles := []string{"HotKey", "TSS", "Solana Relayer Key"}

return nil
res, err := zetaos.PromptPasswords(titles)
if err != nil {
return passwords{}, errors.Wrap(err, "unable to get passwords")
}

return passwords{
hotkey: res[0],
tss: res[1],
solanaRelayerKey: res[2],
}, nil
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
if err != nil {
return false, errors.Wrap(err, "unable to get observers list")
func (p passwords) relayerKeys() map[string]string {
return map[string]string{
chains.Network_solana.String(): p.solanaRelayerKey,
}
}

operatorAddress := client.GetKeys().GetOperatorAddress().String()
func startTelemetry(ctx context.Context, cfg config.Config) (*metrics.TelemetryServer, error) {
// 1. Init pprof http server
pprofServer := func(_ context.Context) error {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}

for _, observer := range observers {
if observer == operatorAddress {
return true, nil
log.Info().Str("addr", addr).Msg("starting pprof http server")

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Error().Err(err).Msg("pprof http server error")
}
}

return false, nil
}
return nil
}
gartnera marked this conversation as resolved.
Show resolved Hide resolved

func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) {
// Get observer's public key ("grantee pub key")
_, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword)
// 2. Init metrics server
metricsServer, err := metrics.NewMetrics()
if err != nil {
return "", errors.Wrap(err, "unable to get keyring key base")
return nil, errors.Wrap(err, "unable to create metrics")
}

return granteePubKeyBech32, nil
}
metrics.Info.WithLabelValues(constant.Version).Set(1)
metrics.LastStartTime.SetToCurrentTime()

// runPprof run pprof http server
// zetacored/cometbft is already listening for runPprof on 6060 (by default)
func runPprof(logger zerolog.Logger) {
addr := os.Getenv(envPprofAddr)
if addr == "" {
addr = "localhost:6061"
}
// 3. Init telemetry server
telemetry := metrics.NewTelemetryServer()
telemetry.SetIPAddress(cfg.PublicIP)

logger.Info().Str("addr", addr).Msg("starting pprof http server")
// 4. Add services to the process
graceful.AddStarter(ctx, pprofServer)
graceful.AddService(ctx, metricsServer)
graceful.AddService(ctx, telemetry)

// #nosec G114 -- timeouts unneeded
err := http.ListenAndServe(addr, nil)
if err != nil {
logger.Error().Err(err).Msg("pprof http server error")
}
return telemetry, nil
}
Loading
Loading