diff --git a/docs/monitoring.md b/docs/monitoring.md index 34b7af862..e2dfe3c26 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -92,6 +92,9 @@ NGINX Gateway Fabric exports the following metrics: - nginx_stale_config. 1 means NGF failed to configure NGINX with the latest version of the configuration, which means NGINX is running with a stale version. - nginx_last_reload_milliseconds. Duration in milliseconds of NGINX reloads (histogram). + - event_batch_processing_milliseconds: Duration in milliseconds of event batch processing (histogram), which is the + time it takes NGF to process batches of Kubernetes events (changes to cluster resources). Note that NGF processes + events in batches, and while processing the current batch, it accumulates events for the next batch. - These metrics have the namespace `nginx_gateway_fabric`, and include the label `class` which is set to the Gateway class of NGF. For example, `nginx_gateway_fabric_nginx_reloads_total{class="nginx"}`. diff --git a/internal/framework/events/eventsfakes/fake_event_handler.go b/internal/framework/events/eventsfakes/fake_event_handler.go index 0b0df5943..0f17b6a39 100644 --- a/internal/framework/events/eventsfakes/fake_event_handler.go +++ b/internal/framework/events/eventsfakes/fake_event_handler.go @@ -5,31 +5,34 @@ import ( "context" "sync" + "github.com/go-logr/logr" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/events" ) type FakeEventHandler struct { - HandleEventBatchStub func(context.Context, events.EventBatch) + HandleEventBatchStub func(context.Context, logr.Logger, events.EventBatch) handleEventBatchMutex sync.RWMutex handleEventBatchArgsForCall []struct { arg1 context.Context - arg2 events.EventBatch + arg2 logr.Logger + arg3 events.EventBatch } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeEventHandler) HandleEventBatch(arg1 context.Context, arg2 events.EventBatch) { +func (fake *FakeEventHandler) HandleEventBatch(arg1 context.Context, arg2 logr.Logger, arg3 events.EventBatch) { fake.handleEventBatchMutex.Lock() fake.handleEventBatchArgsForCall = append(fake.handleEventBatchArgsForCall, struct { arg1 context.Context - arg2 events.EventBatch - }{arg1, arg2}) + arg2 logr.Logger + arg3 events.EventBatch + }{arg1, arg2, arg3}) stub := fake.HandleEventBatchStub - fake.recordInvocation("HandleEventBatch", []interface{}{arg1, arg2}) + fake.recordInvocation("HandleEventBatch", []interface{}{arg1, arg2, arg3}) fake.handleEventBatchMutex.Unlock() if stub != nil { - fake.HandleEventBatchStub(arg1, arg2) + fake.HandleEventBatchStub(arg1, arg2, arg3) } } @@ -39,17 +42,17 @@ func (fake *FakeEventHandler) HandleEventBatchCallCount() int { return len(fake.handleEventBatchArgsForCall) } -func (fake *FakeEventHandler) HandleEventBatchCalls(stub func(context.Context, events.EventBatch)) { +func (fake *FakeEventHandler) HandleEventBatchCalls(stub func(context.Context, logr.Logger, events.EventBatch)) { fake.handleEventBatchMutex.Lock() defer fake.handleEventBatchMutex.Unlock() fake.HandleEventBatchStub = stub } -func (fake *FakeEventHandler) HandleEventBatchArgsForCall(i int) (context.Context, events.EventBatch) { +func (fake *FakeEventHandler) HandleEventBatchArgsForCall(i int) (context.Context, logr.Logger, events.EventBatch) { fake.handleEventBatchMutex.RLock() defer fake.handleEventBatchMutex.RUnlock() argsForCall := fake.handleEventBatchArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeEventHandler) Invocations() map[string][][]interface{} { diff --git a/internal/framework/events/handler.go b/internal/framework/events/handler.go index 8c2cff413..2d512c7ba 100644 --- a/internal/framework/events/handler.go +++ b/internal/framework/events/handler.go @@ -2,6 +2,8 @@ package events import ( "context" + + "github.com/go-logr/logr" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler @@ -10,5 +12,5 @@ import ( type EventHandler interface { // HandleEventBatch handles a batch of events. // EventBatch can include duplicated events. - HandleEventBatch(ctx context.Context, batch EventBatch) + HandleEventBatch(ctx context.Context, logger logr.Logger, batch EventBatch) } diff --git a/internal/framework/events/loop.go b/internal/framework/events/loop.go index a7e805504..3acda1c46 100644 --- a/internal/framework/events/loop.go +++ b/internal/framework/events/loop.go @@ -34,6 +34,9 @@ type EventLoop struct { // The batches are swapped before starting the handler goroutine. currentBatch EventBatch nextBatch EventBatch + + // the ID of the current batch + currentBatchID int } // NewEventLoop creates a new EventLoop. @@ -63,11 +66,14 @@ func (el *EventLoop) Start(ctx context.Context) error { handleBatch := func() { go func(batch EventBatch) { - el.logger.Info("Handling events from the batch", "total", len(batch)) + el.currentBatchID++ + batchLogger := el.logger.WithName("eventHandler").WithValues("batchID", el.currentBatchID) + + batchLogger.Info("Handling events from the batch", "total", len(batch)) - el.handler.HandleEventBatch(ctx, batch) + el.handler.HandleEventBatch(ctx, batchLogger, batch) - el.logger.Info("Finished handling the batch") + batchLogger.Info("Finished handling the batch") handlingDone <- struct{}{} }(el.currentBatch) } diff --git a/internal/framework/events/loop_test.go b/internal/framework/events/loop_test.go index ea53d9ade..19a6f3744 100644 --- a/internal/framework/events/loop_test.go +++ b/internal/framework/events/loop_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -47,7 +48,7 @@ var _ = Describe("EventLoop", func() { // Ensure the first batch is handled Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(1)) - _, batch = fakeHandler.HandleEventBatchArgsForCall(0) + _, _, batch = fakeHandler.HandleEventBatchArgsForCall(0) var expectedBatch events.EventBatch = []interface{}{"event0"} Expect(batch).Should(Equal(expectedBatch)) @@ -70,7 +71,7 @@ var _ = Describe("EventLoop", func() { eventCh <- e Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(2)) - _, batch := fakeHandler.HandleEventBatchArgsForCall(1) + _, _, batch := fakeHandler.HandleEventBatchArgsForCall(1) var expectedBatch events.EventBatch = []interface{}{e} Expect(batch).Should(Equal(expectedBatch)) @@ -82,7 +83,7 @@ var _ = Describe("EventLoop", func() { // The func below will pause the handler goroutine while it is processing the batch with e1 until // sentSecondAndThirdEvents is closed. This way we can add e2 and e3 to the current batch in the meantime. - fakeHandler.HandleEventBatchCalls(func(ctx context.Context, batch events.EventBatch) { + fakeHandler.HandleEventBatchCalls(func(ctx context.Context, logger logr.Logger, batch events.EventBatch) { close(firstHandleEventBatchCallInProgress) <-sentSecondAndThirdEvents }) @@ -106,14 +107,14 @@ var _ = Describe("EventLoop", func() { close(sentSecondAndThirdEvents) Eventually(fakeHandler.HandleEventBatchCallCount).Should(Equal(3)) - _, batch := fakeHandler.HandleEventBatchArgsForCall(1) + _, _, batch := fakeHandler.HandleEventBatchArgsForCall(1) var expectedBatch events.EventBatch = []interface{}{e1} // the first HandleEventBatch() call must have handled a batch with e1 Expect(batch).Should(Equal(expectedBatch)) - _, batch = fakeHandler.HandleEventBatchArgsForCall(2) + _, _, batch = fakeHandler.HandleEventBatchArgsForCall(2) expectedBatch = []interface{}{e2, e3} // the second HandleEventBatch() call must have handled a batch with e2 and e3 diff --git a/internal/mode/provisioner/handler.go b/internal/mode/provisioner/handler.go index 14d3cdd77..0495e176a 100644 --- a/internal/mode/provisioner/handler.go +++ b/internal/mode/provisioner/handler.go @@ -27,7 +27,6 @@ type eventHandler struct { statusUpdater status.Updater k8sClient client.Client - logger logr.Logger staticModeDeploymentYAML []byte @@ -38,7 +37,6 @@ func newEventHandler( gcName string, statusUpdater status.Updater, k8sClient client.Client, - logger logr.Logger, staticModeDeploymentYAML []byte, ) *eventHandler { return &eventHandler{ @@ -47,7 +45,6 @@ func newEventHandler( statusUpdater: statusUpdater, gcName: gcName, k8sClient: k8sClient, - logger: logger, staticModeDeploymentYAML: staticModeDeploymentYAML, gatewayNextID: 1, } @@ -80,7 +77,7 @@ func (h *eventHandler) setGatewayClassStatuses(ctx context.Context) { h.statusUpdater.Update(ctx, statuses) } -func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) { +func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context, logger logr.Logger) { var gwsWithoutDeps, removedGwsWithDeps []types.NamespacedName for nsname, gw := range h.store.gateways { @@ -116,7 +113,7 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) { h.provisions[nsname] = deployment - h.logger.Info( + logger.Info( "Created deployment", "deployment", client.ObjectKeyFromObject(deployment), "gateway", nsname, @@ -134,7 +131,7 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) { delete(h.provisions, nsname) - h.logger.Info( + logger.Info( "Deleted deployment", "deployment", client.ObjectKeyFromObject(deployment), "gateway", nsname, @@ -142,10 +139,10 @@ func (h *eventHandler) ensureDeploymentsMatchGateways(ctx context.Context) { } } -func (h *eventHandler) HandleEventBatch(ctx context.Context, batch events.EventBatch) { +func (h *eventHandler) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) { h.store.update(batch) h.setGatewayClassStatuses(ctx) - h.ensureDeploymentsMatchGateways(ctx) + h.ensureDeploymentsMatchGateways(ctx, logger) } func (h *eventHandler) generateDeploymentID() string { diff --git a/internal/mode/provisioner/handler_test.go b/internal/mode/provisioner/handler_test.go index 38d105c43..da6134841 100644 --- a/internal/mode/provisioner/handler_test.go +++ b/internal/mode/provisioner/handler_test.go @@ -96,7 +96,7 @@ var _ = Describe("handler", func() { Resource: gc, }, } - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) // Ensure GatewayClass is accepted @@ -126,7 +126,7 @@ var _ = Describe("handler", func() { }, } - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) depNsName := types.NamespacedName{ Namespace: "nginx-gateway", @@ -156,7 +156,7 @@ var _ = Describe("handler", func() { } handle := func() { - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) } Expect(handle).Should(Panic()) @@ -179,7 +179,6 @@ var _ = Describe("handler", func() { gcName, statusUpdater, k8sclient, - zap.New(), embeddedfiles.StaticModeDeploymentYAML, ) }) @@ -217,7 +216,7 @@ var _ = Describe("handler", func() { }, } - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) deps := &v1.DeploymentList{} err := k8sclient.List(context.Background(), deps) @@ -237,7 +236,7 @@ var _ = Describe("handler", func() { }, } - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) deps := &v1.DeploymentList{} @@ -266,7 +265,7 @@ var _ = Describe("handler", func() { }, } - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) deps := &v1.DeploymentList{} err := k8sclient.List(context.Background(), deps) @@ -295,7 +294,7 @@ var _ = Describe("handler", func() { }, } - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) unknownGC := &v1beta1.GatewayClass{} err = k8sclient.Get(context.Background(), client.ObjectKeyFromObject(gc), unknownGC) @@ -330,7 +329,6 @@ var _ = Describe("handler", func() { gcName, statusUpdater, k8sclient, - zap.New(), embeddedfiles.StaticModeDeploymentYAML, ) }) @@ -340,7 +338,7 @@ var _ = Describe("handler", func() { batch := []interface{}{e} handle := func() { - handler.HandleEventBatch(context.TODO(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) } Expect(handle).Should(Panic()) @@ -408,7 +406,7 @@ var _ = Describe("handler", func() { } handle := func() { - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) } Expect(handle).Should(Panic()) @@ -429,7 +427,7 @@ var _ = Describe("handler", func() { } handle := func() { - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), zap.New(), batch) } Expect(handle).Should(Panic()) @@ -442,7 +440,6 @@ var _ = Describe("handler", func() { gcName, statusUpdater, k8sclient, - zap.New(), []byte("broken YAML"), ) diff --git a/internal/mode/provisioner/manager.go b/internal/mode/provisioner/manager.go index 2a1b4a09b..0639a7ec6 100644 --- a/internal/mode/provisioner/manager.go +++ b/internal/mode/provisioner/manager.go @@ -107,7 +107,6 @@ func StartManager(cfg Config) error { cfg.GatewayClassName, statusUpdater, mgr.GetClient(), - cfg.Logger.WithName("eventHandler"), embeddedfiles.StaticModeDeploymentYAML, ) diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index eededfa5f..e99cf2ae1 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -3,6 +3,7 @@ package static import ( "context" "fmt" + "time" "github.com/go-logr/logr" apiv1 "k8s.io/api/core/v1" @@ -23,6 +24,10 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" ) +type handlerMetricsCollector interface { + ObserveLastEventBatchProcessTime(time.Duration) +} + // eventHandlerConfig holds configuration parameters for eventHandlerImpl. type eventHandlerConfig struct { // processor is the state ChangeProcessor. @@ -45,8 +50,8 @@ type eventHandlerConfig struct { healthChecker *healthChecker // controlConfigNSName is the NamespacedName of the NginxGateway config for this controller. controlConfigNSName types.NamespacedName - // logger is the logger to be used by the EventHandler. - logger logr.Logger + // metricsCollector collects metrics for this controller. + metricsCollector handlerMetricsCollector // version is the current version number of the nginx config. version int } @@ -67,18 +72,30 @@ func newEventHandlerImpl(cfg eventHandlerConfig) *eventHandlerImpl { } } -func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.EventBatch) { +func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Logger, batch events.EventBatch) { + start := time.Now() + logger.V(1).Info("Started processing event batch") + + defer func() { + duration := time.Since(start) + logger.V(1).Info( + "Finished processing event batch", + "duration", duration.String(), + ) + h.cfg.metricsCollector.ObserveLastEventBatchProcessTime(duration) + }() + for _, event := range batch { switch e := event.(type) { case *events.UpsertEvent: if cfg, ok := e.Resource.(*ngfAPI.NginxGateway); ok { - h.updateControlPlaneAndSetStatus(ctx, cfg) + h.updateControlPlaneAndSetStatus(ctx, logger, cfg) } else { h.cfg.processor.CaptureUpsertChange(e.Resource) } case *events.DeleteEvent: if _, ok := e.Type.(*ngfAPI.NginxGateway); ok { - h.updateControlPlaneAndSetStatus(ctx, nil) + h.updateControlPlaneAndSetStatus(ctx, logger, nil) } else { h.cfg.processor.CaptureDeleteChange(e.Type, e.NamespacedName) } @@ -89,7 +106,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev changed, graph := h.cfg.processor.Process() if !changed { - h.cfg.logger.Info("Handling events didn't result into NGINX configuration changes") + logger.Info("Handling events didn't result into NGINX configuration changes") if !h.cfg.healthChecker.ready && h.cfg.healthChecker.firstBatchError == nil { h.cfg.healthChecker.setAsReady() } @@ -102,13 +119,13 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, batch events.Ev ctx, dataplane.BuildConfiguration(ctx, graph, h.cfg.serviceResolver, h.cfg.version), ); err != nil { - h.cfg.logger.Error(err, "Failed to update NGINX configuration") + logger.Error(err, "Failed to update NGINX configuration") nginxReloadRes.error = err if !h.cfg.healthChecker.ready { h.cfg.healthChecker.firstBatchError = err } } else { - h.cfg.logger.Info("NGINX configuration was successfully updated") + logger.Info("NGINX configuration was successfully updated") if !h.cfg.healthChecker.ready { h.cfg.healthChecker.setAsReady() } @@ -133,17 +150,21 @@ func (h *eventHandlerImpl) updateNginx(ctx context.Context, conf dataplane.Confi // updateControlPlaneAndSetStatus updates the control plane configuration and then sets the status // based on the outcome -func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(ctx context.Context, cfg *ngfAPI.NginxGateway) { +func (h *eventHandlerImpl) updateControlPlaneAndSetStatus( + ctx context.Context, + logger logr.Logger, + cfg *ngfAPI.NginxGateway, +) { var cond []conditions.Condition if err := updateControlPlane( cfg, - h.cfg.logger, + logger, h.cfg.eventRecorder, h.cfg.controlConfigNSName, h.cfg.logLevelSetter, ); err != nil { msg := "Failed to update control plane configuration" - h.cfg.logger.Error(err, msg) + logger.Error(err, msg) h.cfg.eventRecorder.Eventf( cfg, apiv1.EventTypeWarning, @@ -164,6 +185,6 @@ func (h *eventHandlerImpl) updateControlPlaneAndSetStatus(ctx context.Context, c } h.cfg.statusUpdater.Update(ctx, nginxGatewayStatus) - h.cfg.logger.Info("Reconfigured control plane.") + logger.Info("Reconfigured control plane.") } } diff --git a/internal/mode/static/handler_test.go b/internal/mode/static/handler_test.go index 799e04198..94aad2fb4 100644 --- a/internal/mode/static/handler_test.go +++ b/internal/mode/static/handler_test.go @@ -19,6 +19,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status/statusfakes" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/configfakes" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file/filefakes" @@ -68,7 +69,6 @@ var _ = Describe("eventHandler", func() { handler = newEventHandlerImpl(eventHandlerConfig{ processor: fakeProcessor, generator: fakeGenerator, - logger: ctlrZap.New(), logLevelSetter: newZapLogLevelSetter(zap.NewAtomicLevel()), nginxFileMgr: fakeNginxFileMgr, nginxRuntimeMgr: fakeNginxRuntimeMgr, @@ -76,6 +76,7 @@ var _ = Describe("eventHandler", func() { eventRecorder: fakeEventRecorder, healthChecker: &healthChecker{}, controlConfigNSName: types.NamespacedName{Namespace: namespace, Name: configName}, + metricsCollector: collectors.NewControllerNoopCollector(), }) Expect(handler.cfg.healthChecker.ready).To(BeFalse()) }) @@ -115,7 +116,7 @@ var _ = Describe("eventHandler", func() { e := &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}} batch := []interface{}{e} - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) checkUpsertEventExpectations(e) expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles) @@ -128,7 +129,7 @@ var _ = Describe("eventHandler", func() { } batch := []interface{}{e} - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) checkDeleteEventExpectations(e) expectReconfig(dataplane.Configuration{Version: 1}, fakeCfgFiles) @@ -144,12 +145,12 @@ var _ = Describe("eventHandler", func() { } batch := []interface{}{upsertEvent, deleteEvent} - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) checkUpsertEventExpectations(upsertEvent) checkDeleteEventExpectations(deleteEvent) - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) }) }) }) @@ -182,7 +183,7 @@ var _ = Describe("eventHandler", func() { It("handles a valid config", func() { batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevelError)}} - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1)) _, statuses := fakeStatusUpdater.UpdateArgsForCall(0) @@ -193,7 +194,7 @@ var _ = Describe("eventHandler", func() { It("handles an invalid config", func() { batch := []interface{}{&events.UpsertEvent{Resource: cfg(ngfAPI.ControllerLogLevel("invalid"))}} - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(1)) _, statuses := fakeStatusUpdater.UpdateArgsForCall(0) @@ -212,7 +213,7 @@ var _ = Describe("eventHandler", func() { It("handles a deleted config", func() { batch := []interface{}{&events.DeleteEvent{Type: &ngfAPI.NginxGateway{}}} - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(len(fakeEventRecorder.Events)).To(Equal(1)) event := <-fakeEventRecorder.Events Expect(event).To(Equal("Warning ResourceDeleted NginxGateway configuration was deleted; using defaults")) @@ -227,7 +228,7 @@ var _ = Describe("eventHandler", func() { fakeProcessor.ProcessReturns(true, &graph.Graph{}) Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed()) }) @@ -236,7 +237,7 @@ var _ = Describe("eventHandler", func() { batch := []interface{}{e} Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed()) }) @@ -247,14 +248,14 @@ var _ = Describe("eventHandler", func() { fakeProcessor.ProcessReturns(true, &graph.Graph{}) fakeNginxRuntimeMgr.ReloadReturns(errors.New("reload error")) - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) // now send an update with no changes; should still return an error fakeProcessor.ProcessReturns(false, &graph.Graph{}) - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(handler.cfg.healthChecker.readyCheck(nil)).ToNot(Succeed()) @@ -262,7 +263,7 @@ var _ = Describe("eventHandler", func() { fakeProcessor.ProcessReturns(true, &graph.Graph{}) fakeNginxRuntimeMgr.ReloadReturns(nil) - handler.HandleEventBatch(context.Background(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) Expect(handler.cfg.healthChecker.readyCheck(nil)).To(Succeed()) }) @@ -272,7 +273,7 @@ var _ = Describe("eventHandler", func() { handle := func() { batch := []interface{}{e} - handler.HandleEventBatch(context.TODO(), batch) + handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) } Expect(handle).Should(Panic()) diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index bcde121ee..90ec48275 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" apiv1 "k8s.io/api/core/v1" discoveryV1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,7 +30,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/framework/events" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config" - ngfmetrics "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors" ngxcfg "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config" ngxvalidation "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/config/validation" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/file" @@ -130,9 +131,26 @@ func StartManager(cfg config.Config) error { return fmt.Errorf("NGINX is not running: %w", err) } - mgrCollector, err := createAndRegisterMetricsCollectors(cfg.MetricsConfig.Enabled, cfg.GatewayClassName) - if err != nil { - return fmt.Errorf("cannot create and register metrics collectors: %w", err) + var ( + ngxruntimeCollector ngxruntime.MetricsCollector = collectors.NewManagerNoopCollector() + // nolint:ineffassign // not an ineffectual assignment. Will be used if metrics are disabled. + handlerCollector handlerMetricsCollector = collectors.NewControllerNoopCollector() + ) + + if cfg.MetricsConfig.Enabled { + constLabels := map[string]string{"class": cfg.GatewayClassName} + ngxCollector, err := collectors.NewNginxMetricsCollector(constLabels) + if err != nil { + return fmt.Errorf("cannot create nginx metrics collector: %w", err) + } + + ngxruntimeCollector = collectors.NewManagerMetricsCollector(constLabels) + handlerCollector = collectors.NewControllerCollector(constLabels) + metrics.Registry.MustRegister( + ngxCollector, + ngxruntimeCollector.(prometheus.Collector), + handlerCollector.(prometheus.Collector), + ) } statusUpdater := status.NewUpdater(status.UpdaterConfig{ @@ -150,17 +168,17 @@ func StartManager(cfg config.Config) error { processor: processor, serviceResolver: resolver.NewServiceResolverImpl(mgr.GetClient()), generator: ngxcfg.NewGeneratorImpl(), - logger: cfg.Logger.WithName("eventHandler"), logLevelSetter: logLevelSetter, nginxFileMgr: file.NewManagerImpl( cfg.Logger.WithName("nginxFileManager"), file.NewStdLibOSFileManager(), ), - nginxRuntimeMgr: ngxruntime.NewManagerImpl(mgrCollector), + nginxRuntimeMgr: ngxruntime.NewManagerImpl(ngxruntimeCollector), statusUpdater: statusUpdater, eventRecorder: recorder, healthChecker: hc, controlConfigNSName: controlConfigNSName, + metricsCollector: handlerCollector, }) objects, objectLists := prepareFirstEventBatchPreparerArgs(cfg.GatewayClassName, cfg.GatewayNsName) @@ -352,31 +370,6 @@ func setInitialConfig( return updateControlPlane(&config, logger, eventRecorder, configName, logLevelSetter) } -// createAndRegisterMetricsCollectors creates the NGINX status and NGINX runtime manager collectors, registers them, -// and returns the runtime manager collector to be used in the nginxRuntimeMgr. -func createAndRegisterMetricsCollectors(metricsEnabled bool, gwClassName string) (ngxruntime.ManagerCollector, error) { - if !metricsEnabled { - // return a no-op collector to avoid nil pointer errors when metrics are disabled - return ngfmetrics.NewManagerNoopCollector(), nil - } - constLabels := map[string]string{"class": gwClassName} - - ngxCollector, err := ngfmetrics.NewNginxMetricsCollector(constLabels) - if err != nil { - return nil, fmt.Errorf("cannot create NGINX status metrics collector: %w", err) - } - if err := metrics.Registry.Register(ngxCollector); err != nil { - return nil, fmt.Errorf("failed to register NGINX status metrics collector: %w", err) - } - - mgrCollector := ngfmetrics.NewManagerMetricsCollector(constLabels) - if err := metrics.Registry.Register(mgrCollector); err != nil { - return nil, fmt.Errorf("failed to register NGINX manager runtime metrics collector: %w", err) - } - - return mgrCollector, nil -} - func getMetricsOptions(cfg config.MetricsConfig) metricsserver.Options { metricsOptions := metricsserver.Options{BindAddress: "0"} diff --git a/internal/mode/static/metrics/collectors/controller.go b/internal/mode/static/metrics/collectors/controller.go new file mode 100644 index 000000000..d57878660 --- /dev/null +++ b/internal/mode/static/metrics/collectors/controller.go @@ -0,0 +1,58 @@ +package collectors + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics" +) + +// ControllerCollector collects metrics for the NGF controller. +// Implements the prometheus.Collector interface. +type ControllerCollector struct { + // Metrics + eventBatchProcessDuration prometheus.Histogram +} + +// NewControllerCollector creates a new ControllerCollector +func NewControllerCollector(constLabels map[string]string) *ControllerCollector { + nc := &ControllerCollector{ + eventBatchProcessDuration: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "event_batch_processing_milliseconds", + Namespace: metrics.Namespace, + Help: "Duration in milliseconds of event batch processing", + ConstLabels: constLabels, + Buckets: []float64{500, 1000, 5000, 10000, 30000}, + }, + ), + } + return nc +} + +// ObserveLastEventBatchProcessTime adds the last event batch processing time to the histogram. +func (c *ControllerCollector) ObserveLastEventBatchProcessTime(duration time.Duration) { + c.eventBatchProcessDuration.Observe(float64(duration / time.Millisecond)) +} + +// Describe implements prometheus.Collector interface Describe method. +func (c *ControllerCollector) Describe(ch chan<- *prometheus.Desc) { + c.eventBatchProcessDuration.Describe(ch) +} + +// Collect implements the prometheus.Collector interface Collect method. +func (c *ControllerCollector) Collect(ch chan<- prometheus.Metric) { + c.eventBatchProcessDuration.Collect(ch) +} + +// ControllerNoopCollector used to initialize the ControllerCollector when metrics are disabled to avoid nil pointer +// errors. +type ControllerNoopCollector struct{} + +// NewControllerNoopCollector returns an instance of the ControllerNoopCollector. +func NewControllerNoopCollector() *ControllerNoopCollector { + return &ControllerNoopCollector{} +} + +func (c *ControllerNoopCollector) ObserveLastEventBatchProcessTime(_ time.Duration) {} diff --git a/internal/mode/static/metrics/nginx.go b/internal/mode/static/metrics/collectors/nginx.go similarity index 76% rename from internal/mode/static/metrics/nginx.go rename to internal/mode/static/metrics/collectors/nginx.go index c2eadd265..46beed2e5 100644 --- a/internal/mode/static/metrics/nginx.go +++ b/internal/mode/static/metrics/collectors/nginx.go @@ -1,20 +1,20 @@ -package metrics +package collectors import ( "context" "net" "net/http" - "time" prometheusClient "github.com/nginxinc/nginx-prometheus-exporter/client" nginxCollector "github.com/nginxinc/nginx-prometheus-exporter/collector" "github.com/prometheus/client_golang/prometheus" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics" ) const ( - nginxStatusSock = "/var/run/nginx/nginx-status.sock" - nginxStatusURI = "http://config-status/stub_status" - nginxStatusSockTimeout = 10 * time.Second + nginxStatusSock = "/var/run/nginx/nginx-status.sock" + nginxStatusURI = "http://config-status/stub_status" ) // NewNginxMetricsCollector creates an NginxCollector which fetches stats from NGINX over a unix socket @@ -24,7 +24,7 @@ func NewNginxMetricsCollector(constLabels map[string]string) (prometheus.Collect if err != nil { return nil, err } - return nginxCollector.NewNginxCollector(client, metricsNamespace, constLabels), nil + return nginxCollector.NewNginxCollector(client, metrics.Namespace, constLabels), nil } // getSocketClient gets an http.Client with a unix socket transport. diff --git a/internal/mode/static/metrics/collector.go b/internal/mode/static/metrics/collectors/nginx_runtime.go similarity index 51% rename from internal/mode/static/metrics/collector.go rename to internal/mode/static/metrics/collectors/nginx_runtime.go index daa454ed8..4f57cca61 100644 --- a/internal/mode/static/metrics/collector.go +++ b/internal/mode/static/metrics/collectors/nginx_runtime.go @@ -1,13 +1,15 @@ -package metrics +package collectors import ( "time" "github.com/prometheus/client_golang/prometheus" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics" ) -// ManagerMetricsCollector implements ManagerCollector interface and prometheus.Collector interface -type ManagerMetricsCollector struct { +// NginxRuntimeCollector implements runtime.Collector interface and prometheus.Collector interface +type NginxRuntimeCollector struct { // Metrics reloadsTotal prometheus.Counter reloadsError prometheus.Counter @@ -15,20 +17,20 @@ type ManagerMetricsCollector struct { reloadsDuration prometheus.Histogram } -// NewManagerMetricsCollector creates a new ManagerMetricsCollector -func NewManagerMetricsCollector(constLabels map[string]string) *ManagerMetricsCollector { - nc := &ManagerMetricsCollector{ +// NewManagerMetricsCollector creates a new NginxRuntimeCollector +func NewManagerMetricsCollector(constLabels map[string]string) *NginxRuntimeCollector { + nc := &NginxRuntimeCollector{ reloadsTotal: prometheus.NewCounter( prometheus.CounterOpts{ Name: "nginx_reloads_total", - Namespace: metricsNamespace, + Namespace: metrics.Namespace, Help: "Number of successful NGINX reloads", ConstLabels: constLabels, }), reloadsError: prometheus.NewCounter( prometheus.CounterOpts{ Name: "nginx_reload_errors_total", - Namespace: metricsNamespace, + Namespace: metrics.Namespace, Help: "Number of unsuccessful NGINX reloads", ConstLabels: constLabels, }, @@ -36,7 +38,7 @@ func NewManagerMetricsCollector(constLabels map[string]string) *ManagerMetricsCo configStale: prometheus.NewGauge( prometheus.GaugeOpts{ Name: "nginx_stale_config", - Namespace: metricsNamespace, + Namespace: metrics.Namespace, Help: "Indicates if NGINX is not serving the latest configuration.", ConstLabels: constLabels, }, @@ -44,7 +46,7 @@ func NewManagerMetricsCollector(constLabels map[string]string) *ManagerMetricsCo reloadsDuration: prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "nginx_reloads_milliseconds", - Namespace: metricsNamespace, + Namespace: metrics.Namespace, Help: "Duration in milliseconds of NGINX reloads", ConstLabels: constLabels, Buckets: []float64{500, 1000, 5000, 10000, 30000}, @@ -54,50 +56,49 @@ func NewManagerMetricsCollector(constLabels map[string]string) *ManagerMetricsCo return nc } -// IncNginxReloadCount increments the counter of successful NGINX reloads and sets the stale config status to false. -func (mc *ManagerMetricsCollector) IncReloadCount() { - mc.reloadsTotal.Inc() - mc.updateConfigStaleStatus(false) +// IncReloadCount increments the counter of successful NGINX reloads and sets the stale config status to false. +func (c *NginxRuntimeCollector) IncReloadCount() { + c.reloadsTotal.Inc() + c.updateConfigStaleStatus(false) } -// IncNginxReloadErrors increments the counter of NGINX reload errors and sets the stale config status to true. -func (mc *ManagerMetricsCollector) IncReloadErrors() { - mc.reloadsError.Inc() - mc.updateConfigStaleStatus(true) +// IncReloadErrors increments the counter of NGINX reload errors and sets the stale config status to true. +func (c *NginxRuntimeCollector) IncReloadErrors() { + c.reloadsError.Inc() + c.updateConfigStaleStatus(true) } // updateConfigStaleStatus updates the last NGINX reload status metric. -func (mc *ManagerMetricsCollector) updateConfigStaleStatus(stale bool) { +func (c *NginxRuntimeCollector) updateConfigStaleStatus(stale bool) { var status float64 if stale { status = 1.0 } - mc.configStale.Set(status) + c.configStale.Set(status) } // ObserveLastReloadTime adds the last NGINX reload time to the histogram. -func (mc *ManagerMetricsCollector) ObserveLastReloadTime(duration time.Duration) { - mc.reloadsDuration.Observe(float64(duration / time.Millisecond)) +func (c *NginxRuntimeCollector) ObserveLastReloadTime(duration time.Duration) { + c.reloadsDuration.Observe(float64(duration / time.Millisecond)) } // Describe implements prometheus.Collector interface Describe method. -func (mc *ManagerMetricsCollector) Describe(ch chan<- *prometheus.Desc) { - mc.reloadsTotal.Describe(ch) - mc.reloadsError.Describe(ch) - mc.configStale.Describe(ch) - mc.reloadsDuration.Describe(ch) +func (c *NginxRuntimeCollector) Describe(ch chan<- *prometheus.Desc) { + c.reloadsTotal.Describe(ch) + c.reloadsError.Describe(ch) + c.configStale.Describe(ch) + c.reloadsDuration.Describe(ch) } // Collect implements the prometheus.Collector interface Collect method. -func (mc *ManagerMetricsCollector) Collect(ch chan<- prometheus.Metric) { - mc.reloadsTotal.Collect(ch) - mc.reloadsError.Collect(ch) - mc.configStale.Collect(ch) - mc.reloadsDuration.Collect(ch) +func (c *NginxRuntimeCollector) Collect(ch chan<- prometheus.Metric) { + c.reloadsTotal.Collect(ch) + c.reloadsError.Collect(ch) + c.configStale.Collect(ch) + c.reloadsDuration.Collect(ch) } -// ManagerNoopCollector is a no-op collector that will implement ManagerCollector interface. -// Used to initialize the ManagerCollector when metrics are disabled to avoid nil pointer errors. +// ManagerNoopCollector used to initialize the ManagerCollector when metrics are disabled to avoid nil pointer errors. type ManagerNoopCollector struct{} // NewManagerNoopCollector creates a no-op collector that implements ManagerCollector interface. @@ -106,10 +107,10 @@ func NewManagerNoopCollector() *ManagerNoopCollector { } // IncReloadCount implements a no-op IncReloadCount. -func (mc *ManagerNoopCollector) IncReloadCount() {} +func (c *ManagerNoopCollector) IncReloadCount() {} // IncReloadErrors implements a no-op IncReloadErrors. -func (mc *ManagerNoopCollector) IncReloadErrors() {} +func (c *ManagerNoopCollector) IncReloadErrors() {} // ObserveLastReloadTime implements a no-op ObserveLastReloadTime. -func (mc *ManagerNoopCollector) ObserveLastReloadTime(_ time.Duration) {} +func (c *ManagerNoopCollector) ObserveLastReloadTime(_ time.Duration) {} diff --git a/internal/mode/static/metrics/metrics.go b/internal/mode/static/metrics/metrics.go index 1f6b62b12..83cf1948e 100644 --- a/internal/mode/static/metrics/metrics.go +++ b/internal/mode/static/metrics/metrics.go @@ -1,4 +1,4 @@ package metrics // nolint:gosec // flagged as potential hardcoded credentials, but is not sensitive -const metricsNamespace = "nginx_gateway_fabric" +const Namespace = "nginx_gateway_fabric" diff --git a/internal/mode/static/nginx/runtime/manager.go b/internal/mode/static/nginx/runtime/manager.go index e206ab504..48e0e0def 100644 --- a/internal/mode/static/nginx/runtime/manager.go +++ b/internal/mode/static/nginx/runtime/manager.go @@ -35,8 +35,8 @@ type Manager interface { Reload(ctx context.Context, configVersion int) error } -// ManagerCollector is an interface for the metrics of the NGINX runtime manager. -type ManagerCollector interface { +// MetricsCollector is an interface for the metrics of the NGINX runtime manager. +type MetricsCollector interface { IncReloadCount() IncReloadErrors() ObserveLastReloadTime(ms time.Duration) @@ -45,14 +45,14 @@ type ManagerCollector interface { // ManagerImpl implements Manager. type ManagerImpl struct { verifyClient *verifyClient - managerCollector ManagerCollector + metricsCollector MetricsCollector } // NewManagerImpl creates a new ManagerImpl. -func NewManagerImpl(managerCollector ManagerCollector) *ManagerImpl { +func NewManagerImpl(collector MetricsCollector) *ManagerImpl { return &ManagerImpl{ verifyClient: newVerifyClient(nginxReloadTimeout), - managerCollector: managerCollector, + metricsCollector: collector, } } @@ -73,7 +73,7 @@ func (m *ManagerImpl) Reload(ctx context.Context, configVersion int) error { // send HUP signal to the NGINX main process reload configuration // See https://nginx.org/en/docs/control.html if err := syscall.Kill(pid, syscall.SIGHUP); err != nil { - m.managerCollector.IncReloadErrors() + m.metricsCollector.IncReloadErrors() return fmt.Errorf("failed to send the HUP signal to NGINX main: %w", err) } @@ -84,13 +84,13 @@ func (m *ManagerImpl) Reload(ctx context.Context, configVersion int) error { previousChildProcesses, os.ReadFile, ); err != nil { - m.managerCollector.IncReloadErrors() + m.metricsCollector.IncReloadErrors() return err } - m.managerCollector.IncReloadCount() + m.metricsCollector.IncReloadCount() finish := time.Now() - m.managerCollector.ObserveLastReloadTime(finish.Sub(start)) + m.metricsCollector.ObserveLastReloadTime(finish.Sub(start)) return nil } diff --git a/tests/scale/scale.md b/tests/scale/scale.md index 7ee324f86..54a052ccf 100644 --- a/tests/scale/scale.md +++ b/tests/scale/scale.md @@ -62,7 +62,7 @@ are listed in the [Scale Upstream Servers](#scale-upstream-servers) test steps. - Install Prometheus: ```console - kubectl apply -f manifets/prom-clusterrole.yaml + kubectl apply -f manifests/prom-clusterrole.yaml helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo update helm install prom prometheus-community/prometheus --set useExistingClusterRoleName=prometheus -n prom