Skip to content

Commit

Permalink
make state recorders Runnable and remove unstructured listing
Browse files Browse the repository at this point in the history
Signed-off-by: ezgidemirel <[email protected]>
  • Loading branch information
ezgidemirel committed Apr 16, 2024
1 parent 4b6c2de commit aa7264b
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 196 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
k8s.io/api v0.29.1
k8s.io/apiextensions-apiserver v0.29.1
k8s.io/apimachinery v0.29.1
k8s.io/apiserver v0.29.1
k8s.io/client-go v0.29.1
k8s.io/component-base v0.29.1
k8s.io/klog/v2 v2.110.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ k8s.io/apiextensions-apiserver v0.29.1 h1:S9xOtyk9M3Sk1tIpQMu9wXHm5O2MX6Y1kIpPMi
k8s.io/apiextensions-apiserver v0.29.1/go.mod h1:zZECpujY5yTW58co8V2EQR4BD6A9pktVgHhvc0uLfeU=
k8s.io/apimachinery v0.29.1 h1:KY4/E6km/wLBguvCZv8cKTeOwwOBqFNjwJIdMkMbbRc=
k8s.io/apimachinery v0.29.1/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU=
k8s.io/apiserver v0.29.1 h1:e2wwHUfEmMsa8+cuft8MT56+16EONIEK8A/gpBSco+g=
k8s.io/apiserver v0.29.1/go.mod h1:V0EpkTRrJymyVT3M49we8uh2RvXf7fWC5XLB0P3SwRw=
k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A=
k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks=
k8s.io/component-base v0.29.1 h1:MUimqJPCRnnHsskTTjKD+IC1EHBbRCVyi37IoFBrkYw=
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Options struct {
// determine whether it has work to do.
PollInterval time.Duration

// PollStateMetricInterval at which each controller should record state
PollStateMetricInterval time.Duration

// MaxConcurrentReconciles for each controller.
MaxConcurrentReconciles int

Expand All @@ -64,9 +67,11 @@ type Options struct {
// ESSOptions for External Secret Stores.
ESSOptions *ESSOptions

MetricRecorder managed.MetricRecorder
// MetricsRecorder to use for recording metrics.
MRMetrics managed.MetricRecorder

StateRecorder statemetrics.StateRecorder
// StateMetrics to use for recording state metrics.
StateMetrics *statemetrics.MRStateMetrics
}

// ForControllerRuntime extracts options for controller-runtime.
Expand Down
13 changes: 0 additions & 13 deletions pkg/reconciler/managed/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/crossplane/crossplane-runtime/pkg/statemetrics"
)

const (
Expand Down Expand Up @@ -489,7 +488,6 @@ type Reconciler struct {
log logging.Logger
record event.Recorder
metricRecorder MetricRecorder
stateRecorder statemetrics.StateRecorder
}

type mrManaged struct {
Expand Down Expand Up @@ -554,13 +552,6 @@ func WithMetricRecorder(recorder MetricRecorder) ReconcilerOption {
}
}

// WithStateRecorder configures the Reconciler to use the supplied StateRecorder.
func WithStateRecorder(recorder statemetrics.StateRecorder) ReconcilerOption {
return func(r *Reconciler) {
r.stateRecorder = recorder
}
}

// PollIntervalHook represents the function type passed to the
// WithPollIntervalHook option to support dynamic computation of the poll
// interval.
Expand Down Expand Up @@ -719,16 +710,12 @@ func NewReconciler(m manager.Manager, of resource.ManagedKind, o ...ReconcilerOp
log: logging.NewNopLogger(),
record: event.NewNopRecorder(),
metricRecorder: NewNopMetricRecorder(),
stateRecorder: statemetrics.NewNopStateRecorder(),
}

for _, ro := range o {
ro(r)
}

// State recorder is started in the background to record MR states.
go r.stateRecorder.Run(context.Background(), schema.GroupVersionKind(of))

return r
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/resource/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ type Composite interface { //nolint:interfacebloat // This interface has to be b
ConnectionDetailsPublishedTimer
}

// A CompositeList is a list of composite resources.
type CompositeList interface {
client.ObjectList

// GetItems returns the list of composite resources.
GetItems() []Composite
}

// Composed resources can be a composed into a Composite resource.
type Composed interface {
Object
Expand Down
140 changes: 71 additions & 69 deletions pkg/statemetrics/mr_state_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,34 @@ import (

"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/fieldpath"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/resource"
)

// A MRStateRecorderOption configures a MRStateRecorder.
type MRStateRecorderOption func(*MRStateRecorder)

// A MRStateRecorder records the state of managed resources.
type MRStateRecorder struct {
client client.Client
log logging.Logger
interval time.Duration

mrExists *prometheus.GaugeVec
mrReady *prometheus.GaugeVec
mrSynced *prometheus.GaugeVec
// MRStateMetrics holds Prometheus metrics for managed resources.
type MRStateMetrics struct {
Exists *prometheus.GaugeVec
Ready *prometheus.GaugeVec
Synced *prometheus.GaugeVec
}

// NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources.
func NewMRStateRecorder(client client.Client, log logging.Logger, interval time.Duration) *MRStateRecorder {
return &MRStateRecorder{
client: client,
log: log,
interval: interval,

mrExists: prometheus.NewGaugeVec(prometheus.GaugeOpts{
// NewMRStateMetrics returns a new MRStateMetrics.
func NewMRStateMetrics() *MRStateMetrics {
return &MRStateMetrics{
Exists: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: subSystem,
Name: "managed_resource_exists",
Help: "The number of managed resources that exist",
}, []string{"gvk"}),
mrReady: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Ready: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: subSystem,
Name: "managed_resource_ready",
Help: "The number of managed resources in Ready=True state",
}, []string{"gvk"}),
mrSynced: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Synced: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: subSystem,
Name: "managed_resource_synced",
Help: "The number of managed resources in Synced=True state",
Expand All @@ -73,71 +60,86 @@ func NewMRStateRecorder(client client.Client, log logging.Logger, interval time.
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (r *MRStateRecorder) Describe(ch chan<- *prometheus.Desc) {
r.mrExists.Describe(ch)
r.mrReady.Describe(ch)
r.mrSynced.Describe(ch)
func (r *MRStateMetrics) Describe(ch chan<- *prometheus.Desc) {
r.Exists.Describe(ch)
r.Ready.Describe(ch)
r.Synced.Describe(ch)
}

// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (r *MRStateRecorder) Collect(ch chan<- prometheus.Metric) {
r.mrExists.Collect(ch)
r.mrReady.Collect(ch)
r.mrSynced.Collect(ch)
func (r *MRStateMetrics) Collect(ch chan<- prometheus.Metric) {
r.Exists.Collect(ch)
r.Ready.Collect(ch)
r.Synced.Collect(ch)
}

// A MRStateRecorder records the state of managed resources.
type MRStateRecorder struct {
client client.Client
log logging.Logger
interval time.Duration
managedList resource.ManagedList

metrics *MRStateMetrics
}

// NewMRStateRecorder returns a new MRStateRecorder which records the state of managed resources.
func NewMRStateRecorder(client client.Client, log logging.Logger, m *MRStateMetrics, managedList resource.ManagedList, interval time.Duration) *MRStateRecorder {
return &MRStateRecorder{
client: client,
log: log,
metrics: m,
managedList: managedList,
interval: interval,
}
}

// Record records the state of managed resources.
func (r *MRStateRecorder) Record(ctx context.Context, gvk schema.GroupVersionKind) {
l := &unstructured.UnstructuredList{}
l.SetGroupVersionKind(gvk)
err := r.client.List(ctx, l)
if err != nil {
func (r *MRStateRecorder) Record(ctx context.Context, mrList resource.ManagedList) error {
if err := r.client.List(ctx, mrList); err != nil {
r.log.Info("Failed to list managed resources", "error", err)
return
return err
}

label := gvk.String()
r.mrExists.WithLabelValues(label).Set(float64(len(l.Items)))
mrs := mrList.GetItems()
if len(mrs) == 0 {
return nil
}

label := mrs[0].GetObjectKind().GroupVersionKind().String()
r.metrics.Exists.WithLabelValues(label).Set(float64(len(mrs)))

var numReady, numSynced float64 = 0, 0
for _, o := range l.Items {
conditioned := xpv1.ConditionedStatus{}
err := fieldpath.Pave(o.Object).GetValueInto("status", &conditioned)
if err != nil {
r.log.Info("Failed to get conditions of managed resource", "error", err)
continue
for _, o := range mrs {
if o.GetCondition(xpv1.TypeReady).Status == corev1.ConditionTrue {
numReady++
}

for _, condition := range conditioned.Conditions {
if condition.Status == corev1.ConditionTrue {
switch condition.Type {
case xpv1.TypeReady:
numReady++
case xpv1.TypeSynced:
numSynced++
}
}
if o.GetCondition(xpv1.TypeSynced).Status == corev1.ConditionTrue {
numSynced++
}
}

r.mrReady.WithLabelValues(label).Set(numReady)
r.mrSynced.WithLabelValues(label).Set(numSynced)
r.metrics.Ready.WithLabelValues(label).Set(numReady)
r.metrics.Synced.WithLabelValues(label).Set(numSynced)

return nil
}

// Run records state of managed resources with given interval.
func (r *MRStateRecorder) Run(ctx context.Context, gvk schema.GroupVersionKind) {
// Start records state of managed resources with given interval.
func (r *MRStateRecorder) Start(ctx context.Context) error {
ticker := time.NewTicker(r.interval)
go func() {
for {
select {
case <-ticker.C:
r.Record(ctx, gvk)
case <-ctx.Done():
ticker.Stop()
return
for {
select {
case <-ticker.C:
if err := r.Record(ctx, r.managedList); err != nil {
return err
}
case <-ctx.Done():
ticker.Stop()
return nil
}
}()
}
}
16 changes: 3 additions & 13 deletions pkg/statemetrics/state_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ package statemetrics
import (
"context"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const subSystem = "crossplane"

// A StateRecorder records the state of given GroupVersionKind.
type StateRecorder interface {
Describe(ch chan<- *prometheus.Desc)
Collect(ch chan<- prometheus.Metric)

Record(ctx context.Context, gvk schema.GroupVersionKind)
Run(ctx context.Context, gvk schema.GroupVersionKind)
Start(ctx context.Context) error
}

// A NopStateRecorder does nothing.
Expand All @@ -43,14 +39,8 @@ func NewNopStateRecorder() *NopStateRecorder {
return &NopStateRecorder{}
}

// Describe does nothing.
func (r *NopStateRecorder) Describe(_ chan<- *prometheus.Desc) {}

// Collect does nothing.
func (r *NopStateRecorder) Collect(_ chan<- prometheus.Metric) {}

// Record does nothing.
func (r *NopStateRecorder) Record(_ context.Context, _ schema.GroupVersionKind) {}

// Run does nothing.
func (r *NopStateRecorder) Run(_ context.Context, _ schema.GroupVersionKind) {}
// Start does nothing.
func (r *NopStateRecorder) Start(_ context.Context) error { return nil }
Loading

0 comments on commit aa7264b

Please sign in to comment.