Skip to content

Commit

Permalink
Reload CustomResourceState Config File on Change
Browse files Browse the repository at this point in the history
This change adds hot reloading support for the customresourcestate
config file.

It also resolves a bug in which the customresourcestate config file was
included in the ksm config file, in which it did not get detected.

It also resolves a bug in which customresourcestatemetrics were not
added when set resources were non-default resources.

Code for this file change detection was reused from https://github.com/spf13/viper
licensed under MIT license.

Fixes: #1892
  • Loading branch information
mrueg committed Dec 8, 2022
1 parent 448becb commit 03f8459
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 41 deletions.
94 changes: 68 additions & 26 deletions internal/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,28 @@ package internal
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/pkg/app"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/customresourcestate"
"k8s.io/kube-state-metrics/v2/pkg/options"
)

// RunKubeStateMetricsWrapper is a wrapper around KSM, delegated to the root command.
func RunKubeStateMetricsWrapper(opts *options.Options) {
var factories []customresource.RegistryFactory
if config, set := resolveCustomResourceConfig(opts); set {
crf, err := customresourcestate.FromConfig(config)
if err != nil {
klog.ErrorS(err, "Parsing from Custom Resource State Metrics file failed")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
factories = append(factories, crf...)
}

KSMRunOrDie := func(ctx context.Context) {
if err := app.RunKubeStateMetricsWrapper(ctx, opts, factories...); err != nil {
if err := app.RunKubeStateMetricsWrapper(ctx, opts); err != nil {
klog.ErrorS(err, "Failed to run kube-state-metrics")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

ctx, cancel := context.WithCancel(context.Background())
if file := options.GetConfigFile(*opts); file != "" {
viper.SetConfigType("yaml")
Expand All @@ -75,22 +63,76 @@ func RunKubeStateMetricsWrapper(opts *options.Options) {
})
viper.WatchConfig()
}

if opts.CustomResourceConfigFile != "" {
WatchConfig(opts.CustomResourceConfigFile, func(e fsnotify.Event) {
klog.Infof("Changes detected: %s\n", e.Name)
cancel()
// Wait for the ports to be released.
<-time.After(3 * time.Second)
ctx, cancel = context.WithCancel(context.Background())
go KSMRunOrDie(ctx)
})
}
klog.Infoln("Starting kube-state-metrics")
KSMRunOrDie(ctx)
select {}
}

func resolveCustomResourceConfig(opts *options.Options) (customresourcestate.ConfigDecoder, bool) {
if s := opts.CustomResourceConfig; s != "" {
return yaml.NewDecoder(strings.NewReader(s)), true
}
if file := opts.CustomResourceConfigFile; file != "" {
f, err := os.Open(filepath.Clean(file))
// WatchConfig is taken and adapted from https://github.com/spf13/viper/blob/b89e554a96abde447ad13a26dcc59fd00375e555/viper.go#L429 MIT-Licensed
func WatchConfig(filename string, onConfigChange func(fsnotify.Event)) {
initWG := sync.WaitGroup{}
initWG.Add(1)
go func() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.ErrorS(err, "Custom Resource State Metrics file could not be opened")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
klog.ErrorS(err, "Failed to create new watcher")
}
return yaml.NewDecoder(f), true
}
return nil, false
defer watcher.Close()

// we have to watch the entire directory to pick up renames/atomic saves in a cross-platform way
configFile := filepath.Clean(filename)
configDir, _ := filepath.Split(configFile)
realConfigFile, _ := filepath.EvalSymlinks(filename)

eventsWG := sync.WaitGroup{}
eventsWG.Add(1)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok { // 'Events' channel is closed
eventsWG.Done()
return
}
currentConfigFile, _ := filepath.EvalSymlinks(filename)
// we only care about the config file with the following cases:
// 1 - if the config file was modified or created
// 2 - if the real path to the config file changed (eg: k8s ConfigMap replacement)
if (filepath.Clean(event.Name) == configFile &&
(event.Has(fsnotify.Write) || event.Has(fsnotify.Create))) ||
(currentConfigFile != "" && currentConfigFile != realConfigFile) {
realConfigFile = currentConfigFile
if onConfigChange != nil {
onConfigChange(event)
}
} else if filepath.Clean(event.Name) == configFile && event.Has(fsnotify.Remove) {
eventsWG.Done()
return
}

case err, ok := <-watcher.Errors:
if ok { // 'Errors' channel is not closed
klog.ErrorS(err, "Watcher received an error")
}
eventsWG.Done()
return
}
}
}()
watcher.Add(configDir)
initWG.Done() // done initializing the watch in this go routine, so the parent routine can move on...
eventsWG.Wait() // now, wait for event loop to end in this go-routine...
}()
initWG.Wait() // make sure that the go routine above fully ended before returning
}
57 changes: 43 additions & 14 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"time"

"gopkg.in/yaml.v3"
Expand All @@ -47,6 +48,7 @@ import (
"k8s.io/kube-state-metrics/v2/internal/store"
"k8s.io/kube-state-metrics/v2/pkg/allowdenylist"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/customresourcestate"
generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator"
"k8s.io/kube-state-metrics/v2/pkg/metricshandler"
"k8s.io/kube-state-metrics/v2/pkg/optin"
Expand All @@ -73,8 +75,8 @@ func (pl promLogger) Log(v ...interface{}) error {
}

// RunKubeStateMetricsWrapper runs KSM with context cancellation.
func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error {
err := RunKubeStateMetrics(ctx, opts, factories...)
func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options) error {
err := RunKubeStateMetrics(ctx, opts)
if ctx.Err() == context.Canceled {
klog.Infoln("Restarting: kube-state-metrics, metrics will be reset")
return nil
Expand All @@ -85,11 +87,11 @@ func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, fact
// RunKubeStateMetrics will build and run the kube-state-metrics.
// Any out-of-tree custom resource metrics could be registered by newing a registry factory
// which implements customresource.RegistryFactory and pass all factories into this function.
func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error {
func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
var factories []customresource.RegistryFactory
promLogger := promLogger{}

storeBuilder := store.NewBuilder()
storeBuilder.WithCustomResourceStoreFactories(factories...)

ksmMetricsRegistry := prometheus.NewRegistry()
ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics"))
Expand Down Expand Up @@ -144,6 +146,21 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories .
}
}

// Loading custom resource state configuration from cli argument or config file
config, err := resolveCustomResourceConfig(opts)
if err != nil {
return err
}

if config != nil {
crf, err := customresourcestate.FromConfig(config)
if err != nil {
return fmt.Errorf("Parsing from Custom Resource State Metrics file failed: %v", err)
}
factories = append(factories, crf...)
}
storeBuilder.WithCustomResourceStoreFactories(factories...)

if opts.CustomResourceConfigFile != "" {
crcFile, err := os.ReadFile(filepath.Clean(opts.CustomResourceConfigFile))
if err != nil {
Expand All @@ -157,23 +174,21 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories .
}

var resources []string

for _, factory := range factories {
resources = append(resources, factory.Name())
}

switch {
case len(opts.Resources) == 0 && !opts.CustomResourcesOnly:
resources = append(resources, options.DefaultResources.AsSlice()...)
klog.InfoS("Used default resources")
resources = options.DefaultResources.AsSlice()
// enable custom resource
for _, factory := range factories {
resources = append(resources, factory.Name())
}
case opts.CustomResourcesOnly:
// enable custom resource only
for _, factory := range factories {
resources = append(resources, factory.Name())
}
klog.InfoS("Used CRD resources only", "resources", resources)
default:
klog.InfoS("Used resources", "resources", opts.Resources.String())
resources = opts.Resources.AsSlice()
resources = append(resources, opts.Resources.AsSlice()...)
klog.InfoS("Used resources", "resources", resources)
}

if err := storeBuilder.WithEnabledResources(resources); err != nil {
Expand Down Expand Up @@ -419,3 +434,17 @@ func md5HashAsMetricValue(data []byte) float64 {
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}

func resolveCustomResourceConfig(opts *options.Options) (customresourcestate.ConfigDecoder, error) {
if s := opts.CustomResourceConfig; s != "" {
return yaml.NewDecoder(strings.NewReader(s)), nil
}
if file := opts.CustomResourceConfigFile; file != "" {
f, err := os.Open(filepath.Clean(file))
if err != nil {
return nil, fmt.Errorf("Custom Resource State Metrics file could not be opened: %v", err)
}
return yaml.NewDecoder(f), nil
}
return nil, nil
}
2 changes: 1 addition & 1 deletion tests/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestDefaultCollectorMetricsAvailable(t *testing.T) {

files, err := os.ReadDir("../../internal/store/")
if err != nil {
t.Fatalf("failed to read dir to get all resouces name: %v", err)
t.Fatalf("failed to read dir to get all resources name: %v", err)
}

re := regexp.MustCompile(`^([a-z]+).go$`)
Expand Down

0 comments on commit 03f8459

Please sign in to comment.