Skip to content

Commit

Permalink
Reload CustomResourceState Config File on Change
Browse files Browse the repository at this point in the history
This change adds hot reloading support for the customresourcestate
config file.

It also resolves a bug in which the customresourcestate config file was
included in the ksm config file, in which it did not get detected.

It also resolves a bug in which customresourcestatemetrics were not
added when set resources were non-default resources.

Fixes: #1892
  • Loading branch information
mrueg committed Jan 9, 2023
1 parent ef627d6 commit 06268ab
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 48 deletions.
69 changes: 37 additions & 32 deletions internal/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"os"
"path/filepath"
"strings"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -30,67 +29,73 @@ import (
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/pkg/app"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/customresourcestate"
"k8s.io/kube-state-metrics/v2/pkg/options"
)

// RunKubeStateMetricsWrapper is a wrapper around KSM, delegated to the root command.
func RunKubeStateMetricsWrapper(opts *options.Options) {
var factories []customresource.RegistryFactory
if config, set := resolveCustomResourceConfig(opts); set {
crf, err := customresourcestate.FromConfig(config)
if err != nil {
klog.ErrorS(err, "Parsing from Custom Resource State Metrics file failed")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
factories = append(factories, crf...)
}

KSMRunOrDie := func(ctx context.Context) {
if err := app.RunKubeStateMetricsWrapper(ctx, opts, factories...); err != nil {
if err := app.RunKubeStateMetricsWrapper(ctx, opts); err != nil {
klog.ErrorS(err, "Failed to run kube-state-metrics")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

ctx, cancel := context.WithCancel(context.Background())
if file := options.GetConfigFile(*opts); file != "" {
viper.SetConfigType("yaml")
viper.SetConfigFile(file)
if err := viper.ReadInConfig(); err != nil {
cfgViper := viper.New()
cfgViper.SetConfigType("yaml")
cfgViper.SetConfigFile(file)
if err := cfgViper.ReadInConfig(); err != nil {
if errors.Is(err, viper.ConfigFileNotFoundError{}) {
klog.ErrorS(err, "Options configuration file not found", "file", file)
} else {
klog.ErrorS(err, "Error reading options configuration file", "file", file)
}
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
viper.OnConfigChange(func(e fsnotify.Event) {
cfgViper.OnConfigChange(func(e fsnotify.Event) {
klog.Infof("Changes detected: %s\n", e.Name)
cancel()
// Wait for the ports to be released.
<-time.After(3 * time.Second)
ctx, cancel = context.WithCancel(context.Background())
go KSMRunOrDie(ctx)
})
viper.WatchConfig()
}
klog.Infoln("Starting kube-state-metrics")
KSMRunOrDie(ctx)
select {}
}
cfgViper.WatchConfig()

func resolveCustomResourceConfig(opts *options.Options) (customresourcestate.ConfigDecoder, bool) {
if s := opts.CustomResourceConfig; s != "" {
return yaml.NewDecoder(strings.NewReader(s)), true
}
if file := opts.CustomResourceConfigFile; file != "" {
f, err := os.Open(filepath.Clean(file))
// Merge configFile values with opts so we get the CustomResourceConfigFile from config as well
configFile, err := os.ReadFile(filepath.Clean(file))
if err != nil {
klog.ErrorS(err, "Custom Resource State Metrics file could not be opened")
klog.ErrorS(err, "failed to read options configuration file", "file", file)
}

yaml.Unmarshal(configFile, opts)
}
if opts.CustomResourceConfigFile != "" {
crcViper := viper.New()
crcViper.SetConfigType("yaml")
crcViper.SetConfigFile(opts.CustomResourceConfigFile)
if err := crcViper.ReadInConfig(); err != nil {
if errors.Is(err, viper.ConfigFileNotFoundError{}) {
klog.ErrorS(err, "Custom resource configuration file not found", "file", opts.CustomResourceConfigFile)
} else {
klog.ErrorS(err, "Error reading Custom resource configuration file", "file", opts.CustomResourceConfigFile)
}
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
return yaml.NewDecoder(f), true
crcViper.OnConfigChange(func(e fsnotify.Event) {
klog.Infof("Changes detected: %s\n", e.Name)
cancel()
// Wait for the ports to be released.
<-time.After(3 * time.Second)
ctx, cancel = context.WithCancel(context.Background())
go KSMRunOrDie(ctx)
})
crcViper.WatchConfig()
}
return nil, false
klog.Infoln("Starting kube-state-metrics")
KSMRunOrDie(ctx)
select {}
}
59 changes: 44 additions & 15 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"time"

"gopkg.in/yaml.v3"
Expand All @@ -47,6 +48,7 @@ import (
"k8s.io/kube-state-metrics/v2/internal/store"
"k8s.io/kube-state-metrics/v2/pkg/allowdenylist"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/customresourcestate"
generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator"
"k8s.io/kube-state-metrics/v2/pkg/metricshandler"
"k8s.io/kube-state-metrics/v2/pkg/optin"
Expand All @@ -73,8 +75,8 @@ func (pl promLogger) Log(v ...interface{}) error {
}

// RunKubeStateMetricsWrapper runs KSM with context cancellation.
func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error {
err := RunKubeStateMetrics(ctx, opts, factories...)
func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options) error {
err := RunKubeStateMetrics(ctx, opts)
if ctx.Err() == context.Canceled {
klog.Infoln("Restarting: kube-state-metrics, metrics will be reset")
return nil
Expand All @@ -85,11 +87,10 @@ func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, fact
// RunKubeStateMetrics will build and run the kube-state-metrics.
// Any out-of-tree custom resource metrics could be registered by newing a registry factory
// which implements customresource.RegistryFactory and pass all factories into this function.
func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error {
func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
promLogger := promLogger{}

storeBuilder := store.NewBuilder()
storeBuilder.WithCustomResourceStoreFactories(factories...)

ksmMetricsRegistry := prometheus.NewRegistry()
ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics"))
Expand Down Expand Up @@ -144,6 +145,22 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories .
}
}

// Loading custom resource state configuration from cli argument or config file
config, err := resolveCustomResourceConfig(opts)
if err != nil {
return err
}

var factories []customresource.RegistryFactory

if config != nil {
factories, err = customresourcestate.FromConfig(config)
if err != nil {
return fmt.Errorf("Parsing from Custom Resource State Metrics file failed: %v", err)
}
}
storeBuilder.WithCustomResourceStoreFactories(factories...)

if opts.CustomResourceConfigFile != "" {
crcFile, err := os.ReadFile(filepath.Clean(opts.CustomResourceConfigFile))
if err != nil {
Expand All @@ -156,24 +173,22 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories .

}

var resources []string
resources := make([]string, len(factories))

for i, factory := range factories {
resources[i] = factory.Name()
}

switch {
case len(opts.Resources) == 0 && !opts.CustomResourcesOnly:
resources = append(resources, options.DefaultResources.AsSlice()...)
klog.InfoS("Used default resources")
resources = options.DefaultResources.AsSlice()
// enable custom resource
for _, factory := range factories {
resources = append(resources, factory.Name())
}
case opts.CustomResourcesOnly:
// enable custom resource only
for _, factory := range factories {
resources = append(resources, factory.Name())
}
klog.InfoS("Used CRD resources only", "resources", resources)
default:
klog.InfoS("Used resources", "resources", opts.Resources.String())
resources = opts.Resources.AsSlice()
resources = append(resources, opts.Resources.AsSlice()...)
klog.InfoS("Used resources", "resources", resources)
}

if err := storeBuilder.WithEnabledResources(resources); err != nil {
Expand Down Expand Up @@ -419,3 +434,17 @@ func md5HashAsMetricValue(data []byte) float64 {
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}

func resolveCustomResourceConfig(opts *options.Options) (customresourcestate.ConfigDecoder, error) {
if s := opts.CustomResourceConfig; s != "" {
return yaml.NewDecoder(strings.NewReader(s)), nil
}
if file := opts.CustomResourceConfigFile; file != "" {
f, err := os.Open(filepath.Clean(file))
if err != nil {
return nil, fmt.Errorf("Custom Resource State Metrics file could not be opened: %v", err)
}
return yaml.NewDecoder(f), nil
}
return nil, nil
}
2 changes: 1 addition & 1 deletion tests/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestDefaultCollectorMetricsAvailable(t *testing.T) {

files, err := os.ReadDir("../../internal/store/")
if err != nil {
t.Fatalf("failed to read dir to get all resouces name: %v", err)
t.Fatalf("failed to read dir to get all resources name: %v", err)
}

re := regexp.MustCompile(`^([a-z]+).go$`)
Expand Down

0 comments on commit 06268ab

Please sign in to comment.