From 99cb9ac7912e6ef5eafb7403d4ee767da2f368a7 Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Sat, 17 Aug 2024 15:28:55 -0500 Subject: [PATCH] feat: make monitor params configurable via flags (#246) --monitor-max-retries - max count of status retries before closing the lease. defaults to 40 --monitor-retry-period - monitor status retry period. defaults to 4s (min value) --monitor-retry-period-jitter - monitor status retry window. defaults to 15s --monitor-healthcheck-period - monitor healthcheck period. defaults to 10s --monitor-healthcheck-period-jitter - monitor healthcheck window. defaults to 5s Signed-off-by: Artur Troian --- cluster/config.go | 10 +++++++ cluster/monitor.go | 35 +++++++++-------------- cluster/monitor_test.go | 3 ++ cmd/provider-services/cmd/run.go | 48 ++++++++++++++++++++++++++++++++ config.go | 36 ++++++++++-------------- service.go | 14 +--------- 6 files changed, 90 insertions(+), 56 deletions(-) diff --git a/cluster/config.go b/cluster/config.go index bfef7171..5d76159f 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -13,6 +13,11 @@ type Config struct { BlockedHostnames []string DeploymentIngressStaticHosts bool DeploymentIngressDomain string + MonitorMaxRetries uint + MonitorRetryPeriod time.Duration + MonitorRetryPeriodJitter time.Duration + MonitorHealthcheckPeriod time.Duration + MonitorHealthcheckPeriodJitter time.Duration ClusterSettings map[interface{}]interface{} } @@ -20,5 +25,10 @@ func NewDefaultConfig() Config { return Config{ InventoryResourcePollPeriod: time.Second * 5, InventoryResourceDebugFrequency: 10, + MonitorMaxRetries: 40, + MonitorRetryPeriod: time.Second * 4, // nolint revive + MonitorRetryPeriodJitter: time.Second * 15, + MonitorHealthcheckPeriod: time.Second * 10, // nolint revive + MonitorHealthcheckPeriodJitter: time.Second * 5, } } diff --git a/cluster/monitor.go b/cluster/monitor.go index c2a47317..4123babd 100644 --- a/cluster/monitor.go +++ b/cluster/monitor.go @@ -23,15 +23,6 @@ import ( "github.com/akash-network/provider/tools/fromctx" ) -const ( - monitorMaxRetries = 40 - monitorRetryPeriodMin = time.Second * 4 // nolint revive - monitorRetryPeriodJitter = time.Second * 15 - - monitorHealthcheckPeriodMin = time.Second * 10 // nolint revive - monitorHealthcheckPeriodJitter = time.Second * 5 -) - var ( deploymentHealthCheckCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "provider_deployment_monitor_health", @@ -45,22 +36,22 @@ type deploymentMonitor struct { deployment ctypes.IDeployment - attempts int + attempts uint log log.Logger lc lifecycle.Lifecycle - clusterSettings map[interface{}]interface{} + config Config } func newDeploymentMonitor(dm *deploymentManager) *deploymentMonitor { m := &deploymentMonitor{ - bus: dm.bus, - session: dm.session, - client: dm.client, - deployment: dm.deployment, - log: dm.log.With("cmp", "deployment-monitor"), - lc: lifecycle.New(), - clusterSettings: dm.config.ClusterSettings, + bus: dm.bus, + session: dm.session, + client: dm.client, + deployment: dm.deployment, + log: dm.log.With("cmp", "deployment-monitor"), + lc: lifecycle.New(), + config: dm.config, } go m.lc.WatchChannel(dm.lc.ShuttingDown()) @@ -126,7 +117,7 @@ loop: m.publishStatus(event.ClusterDeploymentPending) - if m.attempts <= monitorMaxRetries { + if m.attempts <= m.config.MonitorMaxRetries { // unhealthy. retry tickch = m.scheduleRetry() break @@ -166,7 +157,7 @@ func (m *deploymentMonitor) runCheck(ctx context.Context) <-chan runner.Result { } func (m *deploymentMonitor) doCheck(ctx context.Context) (bool, error) { - ctx = fromctx.ApplyToContext(ctx, m.clusterSettings) + ctx = fromctx.ApplyToContext(ctx, m.config.ClusterSettings) status, err := m.client.LeaseStatus(ctx, m.deployment.LeaseID()) @@ -226,11 +217,11 @@ func (m *deploymentMonitor) publishStatus(status event.ClusterDeploymentStatus) } func (m *deploymentMonitor) scheduleRetry() <-chan time.Time { - return m.schedule(monitorRetryPeriodMin, monitorRetryPeriodJitter) + return m.schedule(m.config.MonitorRetryPeriod, m.config.MonitorRetryPeriodJitter) } func (m *deploymentMonitor) scheduleHealthcheck() <-chan time.Time { - return m.schedule(monitorHealthcheckPeriodMin, monitorHealthcheckPeriodJitter) + return m.schedule(m.config.MonitorHealthcheckPeriod, m.config.MonitorHealthcheckPeriodJitter) } func (m *deploymentMonitor) schedule(min, jitter time.Duration) <-chan time.Time { diff --git a/cluster/monitor_test.go b/cluster/monitor_test.go index 08f0d487..08b2b750 100644 --- a/cluster/monitor_test.go +++ b/cluster/monitor_test.go @@ -39,6 +39,7 @@ func TestMonitorInstantiate(t *testing.T) { deployment: deployment, log: myLog, lc: lc, + config: NewDefaultConfig(), } monitor := newDeploymentMonitor(myDeploymentManager) require.NotNil(t, monitor) @@ -78,6 +79,7 @@ func TestMonitorSendsClusterDeploymentPending(t *testing.T) { deployment: deployment, log: myLog, lc: lc, + config: NewDefaultConfig(), } monitor := newDeploymentMonitor(myDeploymentManager) require.NotNil(t, monitor) @@ -134,6 +136,7 @@ func TestMonitorSendsClusterDeploymentDeployed(t *testing.T) { deployment: deployment, log: myLog, lc: lc, + config: NewDefaultConfig(), } monitor := newDeploymentMonitor(myDeploymentManager) require.NotNil(t, monitor) diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index 91120b9e..b3bbaab5 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -102,6 +102,11 @@ const ( FlagBidPriceIPScale = "bid-price-ip-scale" FlagEnableIPOperator = "ip-operator" FlagTxBroadcastTimeout = "tx-broadcast-timeout" + FlagMonitorMaxRetries = "monitor-max-retries" + FlagMonitorRetryPeriod = "monitor-retry-period" + FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter" + FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period" + FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter" ) const ( @@ -131,6 +136,14 @@ func RunCmd() *cobra.Command { return errors.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113 } + if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second { + return errors.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorRetryPeriod, 4*time.Second) // nolint: err113 + } + + if viper.GetDuration(FlagMonitorHealthcheckPeriod) < 4*time.Second { + return errors.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorHealthcheckPeriod, 4*time.Second) // nolint: err113 + } + group, ctx := errgroup.WithContext(cmd.Context()) cmd.SetContext(ctx) @@ -397,6 +410,31 @@ func RunCmd() *cobra.Command { panic(err) } + cmd.Flags().Uint(FlagMonitorMaxRetries, 40, "max count of status retries before closing the lease. defaults to 40") + if err := viper.BindPFlag(FlagMonitorMaxRetries, cmd.Flags().Lookup(FlagMonitorMaxRetries)); err != nil { + panic(err) + } + + cmd.Flags().Duration(FlagMonitorRetryPeriod, 4*time.Second, "monitor status retry period. defaults to 4s (min value)") + if err := viper.BindPFlag(FlagMonitorRetryPeriod, cmd.Flags().Lookup(FlagMonitorRetryPeriod)); err != nil { + panic(err) + } + + cmd.Flags().Duration(FlagMonitorRetryPeriodJitter, 15*time.Second, "monitor status retry window. defaults to 15s") + if err := viper.BindPFlag(FlagMonitorRetryPeriodJitter, cmd.Flags().Lookup(FlagMonitorRetryPeriodJitter)); err != nil { + panic(err) + } + + cmd.Flags().Duration(FlagMonitorHealthcheckPeriod, 10*time.Second, "monitor healthcheck period. defaults to 10s") + if err := viper.BindPFlag(FlagMonitorHealthcheckPeriod, cmd.Flags().Lookup(FlagMonitorHealthcheckPeriod)); err != nil { + panic(err) + } + + cmd.Flags().Duration(FlagMonitorHealthcheckPeriodJitter, 5*time.Second, "monitor healthcheck window. defaults to 5s") + if err := viper.BindPFlag(FlagMonitorHealthcheckPeriodJitter, cmd.Flags().Lookup(FlagMonitorHealthcheckPeriodJitter)); err != nil { + panic(err) + } + if err := providerflags.AddServiceEndpointFlag(cmd, serviceHostnameOperator); err != nil { panic(err) } @@ -522,6 +560,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout) enableIPOperator := viper.GetBool(FlagEnableIPOperator) + monitorMaxRetries := viper.GetUint(FlagMonitorMaxRetries) + monitorRetryPeriod := viper.GetDuration(FlagMonitorRetryPeriod) + monitorRetryPeriodJitter := viper.GetDuration(FlagMonitorRetryPeriodJitter) + monitorHealthcheckPeriod := viper.GetDuration(FlagMonitorHealthcheckPeriod) + monitorHealthcheckPeriodJitter := viper.GetDuration(FlagMonitorHealthcheckPeriodJitter) pricing, err := createBidPricingStrategy(strategy) if err != nil { @@ -656,6 +699,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { config.DeploymentIngressDomain = deploymentIngressDomain config.BidTimeout = bidTimeout config.ManifestTimeout = manifestTimeout + config.MonitorMaxRetries = monitorMaxRetries + config.MonitorRetryPeriod = monitorRetryPeriod + config.MonitorRetryPeriodJitter = monitorRetryPeriodJitter + config.MonitorHealthcheckPeriod = monitorHealthcheckPeriod + config.MonitorHealthcheckPeriodJitter = monitorHealthcheckPeriodJitter if len(providerConfig) != 0 { pConf, err := config2.ReadConfigPath(providerConfig) diff --git a/config.go b/config.go index 808a2e19..28dc015b 100644 --- a/config.go +++ b/config.go @@ -10,30 +10,23 @@ import ( types "github.com/akash-network/akash-api/go/node/types/v1beta3" "github.com/akash-network/provider/bidengine" + "github.com/akash-network/provider/cluster" ) type Config struct { - ClusterWaitReadyDuration time.Duration - ClusterPublicHostname string - ClusterExternalPortQuantity uint - InventoryResourcePollPeriod time.Duration - InventoryResourceDebugFrequency uint - BidPricingStrategy bidengine.BidPricingStrategy - BidDeposit sdk.Coin - CPUCommitLevel float64 - MemoryCommitLevel float64 - StorageCommitLevel float64 - MaxGroupVolumes int - BlockedHostnames []string - BidTimeout time.Duration - ManifestTimeout time.Duration - BalanceCheckerCfg BalanceCheckerConfig - Attributes types.Attributes - DeploymentIngressStaticHosts bool - DeploymentIngressDomain string - ClusterSettings map[interface{}]interface{} - RPCQueryTimeout time.Duration - CachedResultMaxAge time.Duration + ClusterWaitReadyDuration time.Duration + ClusterPublicHostname string + ClusterExternalPortQuantity uint + BidPricingStrategy bidengine.BidPricingStrategy + BidDeposit sdk.Coin + BidTimeout time.Duration + ManifestTimeout time.Duration + BalanceCheckerCfg BalanceCheckerConfig + Attributes types.Attributes + MaxGroupVolumes int + RPCQueryTimeout time.Duration + CachedResultMaxAge time.Duration + cluster.Config } func NewDefaultConfig() Config { @@ -45,5 +38,6 @@ func NewDefaultConfig() Config { WithdrawalPeriod: 24 * time.Hour, }, MaxGroupVolumes: constants.DefaultMaxGroupVolumes, + Config: cluster.NewDefaultConfig(), } } diff --git a/service.go b/service.go index 20bbae2d..89c7eecd 100644 --- a/service.go +++ b/service.go @@ -74,18 +74,6 @@ func NewService(ctx context.Context, session = session.ForModule("provider-service") - clusterConfig := cluster.NewDefaultConfig() - clusterConfig.InventoryResourcePollPeriod = cfg.InventoryResourcePollPeriod - clusterConfig.InventoryResourceDebugFrequency = cfg.InventoryResourceDebugFrequency - clusterConfig.InventoryExternalPortQuantity = cfg.ClusterExternalPortQuantity - clusterConfig.CPUCommitLevel = cfg.CPUCommitLevel - clusterConfig.MemoryCommitLevel = cfg.MemoryCommitLevel - clusterConfig.StorageCommitLevel = cfg.StorageCommitLevel - clusterConfig.BlockedHostnames = cfg.BlockedHostnames - clusterConfig.DeploymentIngressStaticHosts = cfg.DeploymentIngressStaticHosts - clusterConfig.DeploymentIngressDomain = cfg.DeploymentIngressDomain - clusterConfig.ClusterSettings = cfg.ClusterSettings - cl, err := aclient.DiscoverQueryClient(ctx, cctx) if err != nil { cancel() @@ -99,7 +87,7 @@ func NewService(ctx context.Context, return nil, err } - cluster, err := cluster.NewService(ctx, session, bus, cclient, waiter, clusterConfig) + cluster, err := cluster.NewService(ctx, session, bus, cclient, waiter, cfg.Config) if err != nil { cancel() <-bc.lc.Done()