diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 47b863f28..99f437ccd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,6 +73,8 @@ jobs: runs-on: ubuntu-22.04 steps: - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 + with: + fetch-depth: 0 - uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0 with: go-version-file: 'go.mod' @@ -144,6 +146,8 @@ jobs: version: "bookworm-slim" steps: - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 + with: + fetch-depth: 0 - uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0 with: go-version-file: 'go.mod' diff --git a/docs/swagger.json b/docs/swagger.json index b8dab3c80..0184ff35d 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3,6 +3,23 @@ "info": {}, "host": "localhost:8081", "paths": { + "/health": { + "get": { + "tags": [ + "nginx-agent" + ], + "summary": "Check the health of the NGINX Agent", + "operationId": "health-check", + "responses": { + "200": { + "description": "HealthResponse", + "schema": { + "$ref": "#/definitions/HealthResponse" + } + } + } + } + }, "/metrics/": { "get": { "description": "# Returns prometheus metrics", @@ -127,6 +144,12 @@ "schema": { "$ref": "#/definitions/AgentAPIConfigApplyStatusResponse" } + }, + "500": { + "description": "AgentAPICommonResponse", + "schema": { + "$ref": "#/definitions/AgentAPICommonResponse" + } } } } @@ -195,6 +218,44 @@ }, "x-go-package": "github.com/nginx/agent/v2/src/plugins" }, + "HealthResponse": { + "type": "object", + "properties": { + "checks": { + "description": "Array of health checks", + "type": "array", + "items": { + "$ref": "#/definitions/HealthStatusCheck" + }, + "x-go-name": "Checks" + }, + "status": { + "description": "Overall health status", + "type": "string", + "x-go-name": "Status", + "example": "OK" + } + }, + "x-go-package": "github.com/nginx/agent/v2/src/plugins" + }, + "HealthStatusCheck": { + "type": "object", + "properties": { + "name": { + "description": "Health check name", + "type": "string", + "x-go-name": "Name", + "example": "commandConnection" + }, + "status": { + "description": "Health check status", + "type": "string", + "x-go-name": "Status", + "example": "OK" + } + }, + "x-go-package": "github.com/nginx/agent/v2/src/plugins" + }, "NginxDetails": { "type": "object", "properties": { diff --git a/main.go b/main.go index f8c0261b5..0fa995e5a 100644 --- a/main.go +++ b/main.go @@ -73,10 +73,7 @@ func main() { controller, commander, reporter := core.CreateGrpcClients(ctx, loadedConfig) if controller != nil { - if err := controller.Connect(); err != nil { - log.Warnf("Unable to connect to control plane: %v", err) - return - } + go controller.Connect() } binary := core.NewNginxBinary(env, loadedConfig) diff --git a/sdk/client/client.go b/sdk/client/client.go index 1aa31c0f4..29e7a26e8 100644 --- a/sdk/client/client.go +++ b/sdk/client/client.go @@ -78,7 +78,7 @@ type ( WithClient(Client) Controller Context() context.Context WithContext(context.Context) Controller - Connect() error + Connect() Close() error } ) diff --git a/sdk/client/commander.go b/sdk/client/commander.go index 7423abece..6383fed9a 100644 --- a/sdk/client/commander.go +++ b/sdk/client/commander.go @@ -10,6 +10,7 @@ package client import ( "context" "encoding/json" + "errors" "fmt" "io" "sync" @@ -74,11 +75,15 @@ func (c *commander) Connect(ctx context.Context) error { log.Debugf("Commander connecting to %s", c.server) c.ctx = ctx + + c.retryLock.Lock() err := backoff.WaitUntil( c.ctx, c.backoffSettings, c.createClient, ) + c.retryLock.Unlock() + if err != nil { return err } @@ -163,6 +168,11 @@ func (c *commander) Send(ctx context.Context, message Message) error { return err } + if c.channel == nil { + c.setIsRetrying(true) + return c.handleGrpcError("Commander Channel Send", errors.New("command channel client not created yet")) + } + if err := c.channel.Send(cmd); err != nil { c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) diff --git a/sdk/client/controller.go b/sdk/client/controller.go index 7117269d0..c1d23ea70 100644 --- a/sdk/client/controller.go +++ b/sdk/client/controller.go @@ -10,6 +10,8 @@ package client import ( "context" "fmt" + + log "github.com/sirupsen/logrus" ) func NewClientController() Controller { @@ -33,19 +35,12 @@ func (c *ctrl) WithContext(ctx context.Context) Controller { return c } -func (c *ctrl) Connect() error { - var retErr error +func (c *ctrl) Connect() { for _, client := range c.clients { if err := client.Connect(c.ctx); err != nil { - if retErr == nil { - retErr = fmt.Errorf("%s failed to connect: %w", client.Server(), err) - } else { - retErr = fmt.Errorf("%v\n%s failed to connect: %w", retErr, client.Server(), err) - } + log.Warnf("%s failed to connect: %v", client.Server(), err) } } - - return retErr } func (c *ctrl) Close() error { diff --git a/sdk/client/controller_test.go b/sdk/client/controller_test.go index 4a445c044..46076a48f 100644 --- a/sdk/client/controller_test.go +++ b/sdk/client/controller_test.go @@ -45,8 +45,7 @@ func TestControllerConnect(t *testing.T) { controller.WithClient(metricsReportClient) controller.WithContext(ctx) - err := controller.Connect() - assert.Nil(t, err) + controller.Connect() commanderClient.AssertNumberOfCalls(t, "Connect", 1) metricsReportClient.AssertNumberOfCalls(t, "Connect", 1) @@ -75,8 +74,7 @@ func TestControllerConnect_error(t *testing.T) { controller.WithClient(metricsReportClient) controller.WithContext(ctx) - err := controller.Connect() - assert.NotNil(t, err) + controller.Connect() commanderClient.AssertNumberOfCalls(t, "Connect", 1) metricsReportClient.AssertNumberOfCalls(t, "Connect", 1) diff --git a/sdk/client/metric_reporter.go b/sdk/client/metric_reporter.go index 44158b4e9..bfb26365a 100644 --- a/sdk/client/metric_reporter.go +++ b/sdk/client/metric_reporter.go @@ -9,6 +9,7 @@ package client import ( "context" + "errors" "fmt" "io" "sync" @@ -28,6 +29,7 @@ func NewMetricReporterClient() MetricReporter { return &metricReporter{ connector: newConnector(), backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -39,6 +41,8 @@ type metricReporter struct { ctx context.Context mu sync.Mutex backoffSettings backoff.BackoffSettings + isRetrying bool + retryLock sync.Mutex } func (r *metricReporter) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -57,11 +61,14 @@ func (r *metricReporter) Connect(ctx context.Context) error { log.Debugf("Metric Reporter connecting to %s", r.server) r.ctx = ctx + + r.retryLock.Lock() err := backoff.WaitUntil( r.ctx, r.backoffSettings, r.createClient, ) + r.retryLock.Unlock() if err != nil { return err } @@ -151,18 +158,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err := r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err + } + + if r.channel == nil { + r.isRetrying = true + return r.handleGrpcError("Metric Reporter Channel Send", errors.New("metric service stream client not created yet")) } + if err := r.channel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Channel Send", err) } @@ -176,18 +184,14 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err = r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err } + if err := r.eventsChannel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Events Channel Send", err) } @@ -202,6 +206,21 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return err } +func (r *metricReporter) checkClientConnection() error { + r.retryLock.Lock() + defer r.retryLock.Unlock() + + if r.isRetrying { + log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) + err := r.createClient() + if err != nil { + return err + } + } + + return nil +} + func (r *metricReporter) closeConnection() error { var err error if r.channel != nil { diff --git a/src/core/topics.go b/src/core/topics.go index 6f6308c7c..7051eb03d 100644 --- a/src/core/topics.go +++ b/src/core/topics.go @@ -10,7 +10,6 @@ package core const ( UNKNOWN = "unknown" RegistrationPrefix = "registration." - RegistrationCompletedTopic = RegistrationPrefix + "completed" CommNginxConfig = "nginx.config" NginxConfigUpload = "nginx.config.upload" NginxReload = "nginx.reload" @@ -49,4 +48,6 @@ const ( EnableExtension = "enable.extension" EnableFeature = "enable.feature" AgentAPIConfigApplyResponse = "agent.api.config.apply.response" + CommandSent = "command.sent" + MetricReportSent = "metrics.report.sent" ) diff --git a/src/plugins/agent_api.go b/src/plugins/agent_api.go index 2fe5e86ef..2895c0529 100644 --- a/src/plugins/agent_api.go +++ b/src/plugins/agent_api.go @@ -45,9 +45,14 @@ const ( pendingStatus = "PENDING" errorStatus = "ERROR" unknownStatus = "UNKNOWN" + + registration string = "registration" + commandConnection string = "commandConnection" + metricsConnection string = "metricsConnection" ) var ( + healthRegex = regexp.MustCompile(`^\/health[\/]*$`) instancesRegex = regexp.MustCompile(`^\/nginx[\/]*$`) configRegex = regexp.MustCompile(`^\/nginx/config[\/]*$`) configStatusRegex = regexp.MustCompile(`^\/nginx/config/status[\/]*$`) @@ -60,10 +65,18 @@ type AgentAPI struct { server http.Server nginxBinary core.NginxBinary nginxHandler *NginxHandler + rootHandler *RootHandler exporter *prometheus_metrics.Exporter processes []*core.Process } +type RootHandler struct { + config *config.Config + isGrpcRegistered bool + lastCommandSent time.Time + lastMetricReportSent time.Time +} + type NginxHandler struct { config *config.Config env core.Environment @@ -132,6 +145,25 @@ type AgentAPIConfigApplyStatusResponse struct { Status string `json:"status"` } +// swagger:model HealthStatusCheck +type HealthStatusCheck struct { + // Health check name + // example: commandConnection + Name string `json:"name"` + // Health check status + // example: OK + Status string `json:"status"` +} + +// swagger:model HealthResponse +type HealthResponse struct { + // Overall health status + // example: OK + Status string `json:"status"` + // Array of health checks + Checks []HealthStatusCheck `json:"checks"` +} + const ( contentTypeHeader = "Content-Type" jsonMimeType = "application/json" @@ -191,6 +223,12 @@ func (a *AgentAPI) Process(message *core.Message) { if a.nginxHandler != nil { a.nginxHandler.syncProcessInfo(a.processes) } + case core.AgentConnected: + a.rootHandler.isGrpcRegistered = true + case core.CommandSent: + a.rootHandler.lastCommandSent = time.Now() + case core.MetricReportSent: + a.rootHandler.lastMetricReportSent = time.Now() } } @@ -206,10 +244,19 @@ func (a *AgentAPI) Subscriptions() []string { core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, core.NginxDetailProcUpdate, + core.AgentConnected, + core.CommandSent, + core.MetricReportSent, } } func (a *AgentAPI) createHttpServer() { + a.rootHandler = &RootHandler{ + config: a.config, + isGrpcRegistered: false, + lastMetricReportSent: time.Now(), + } + a.nginxHandler = &NginxHandler{ config: a.config, pipeline: a.pipeline, @@ -224,6 +271,7 @@ func (a *AgentAPI) createHttpServer() { mux.Handle("/metrics/", a.getPrometheusHandler()) mux.Handle("/nginx/", a.nginxHandler) + mux.Handle("/", a.rootHandler) handler := cors.New(cors.Options{AllowedMethods: []string{"OPTIONS", "GET", "PUT"}}).Handler(mux) a.server = http.Server{ @@ -546,7 +594,7 @@ func (h *NginxHandler) applyNginxConfig(nginxDetail *proto.NginxDetails, buf *by // 200: AgentAPIConfigApplyResponse // 400: AgentAPIConfigApplyStatusResponse // 404: AgentAPIConfigApplyStatusResponse -// 500 +// 500: AgentAPICommonResponse func (h *NginxHandler) getConfigStatus(w http.ResponseWriter, r *http.Request) error { correlationId := r.URL.Query().Get("correlation_id") @@ -605,6 +653,94 @@ func (h *NginxHandler) syncProcessInfo(processInfo []*core.Process) { h.processes = processInfo } +func (rh *RootHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set(contentTypeHeader, jsonMimeType) + + switch { + case healthRegex.MatchString(r.URL.Path): + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + err := rh.healthCheck(w) + if err != nil { + log.Warnf("Failed to get agent health: %v", err) + } + default: + w.WriteHeader(http.StatusNotFound) + _, err := fmt.Fprint(w, []byte("not found")) + if err != nil { + log.Warnf("Failed to send api response: %v", err) + } + } +} + +// swagger:route GET /health nginx-agent health-check +// +// # Check the health of the NGINX Agent +// +// responses: +// +// 200: HealthResponse +func (rh *RootHandler) healthCheck(w http.ResponseWriter) error { + w.WriteHeader(http.StatusOK) + + overallStatus := okStatus + checks := []HealthStatusCheck{} + + registrationStatus := okStatus + commandServiceStatus := okStatus + metricsServiceStatus := okStatus + + if rh.config.IsGrpcServerConfigured() { + if !rh.isGrpcRegistered { + registrationStatus = errorStatus + overallStatus = errorStatus + } + + checks = append(checks, HealthStatusCheck{ + Name: registration, + Status: registrationStatus, + }) + + timeNow := time.Now() + + lastCommandSentDiff := timeNow.Sub(rh.lastCommandSent) + + if lastCommandSentDiff > (2 * rh.config.Dataplane.Status.PollInterval) { + commandServiceStatus = errorStatus + overallStatus = errorStatus + } + + checks = append(checks, HealthStatusCheck{ + Name: commandConnection, + Status: commandServiceStatus, + }) + + if rh.config.IsFeatureEnabled(agent_config.FeatureMetrics) || rh.config.IsFeatureEnabled(agent_config.FeatureMetricsSender) { + lastMetricReportSentDiff := timeNow.Sub(rh.lastMetricReportSent) + + if lastMetricReportSentDiff > (2 * rh.config.AgentMetrics.ReportInterval) { + metricsServiceStatus = errorStatus + overallStatus = errorStatus + } + + checks = append(checks, HealthStatusCheck{ + Name: metricsConnection, + Status: metricsServiceStatus, + }) + } + } + + healthResponse := &HealthResponse{ + Status: overallStatus, + Checks: checks, + } + + return writeObjectToResponseBody(w, healthResponse) +} + func writeObjectToResponseBody(w http.ResponseWriter, response any) error { respBody := new(bytes.Buffer) err := json.NewEncoder(respBody).Encode(response) diff --git a/src/plugins/agent_api_test.go b/src/plugins/agent_api_test.go index 542b8c8b0..4f0ffa357 100644 --- a/src/plugins/agent_api_test.go +++ b/src/plugins/agent_api_test.go @@ -56,6 +56,9 @@ func TestAgentAPI_Subscriptions(t *testing.T) { core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, core.NginxDetailProcUpdate, + core.AgentConnected, + core.CommandSent, + core.MetricReportSent, } agentAPI := AgentAPI{} @@ -525,6 +528,140 @@ func TestMtls_forApi(t *testing.T) { } } +func TestRootHandler_healthCheck(t *testing.T) { + agentConfig := tutils.GetMockAgentConfig() + agentConfig.Dataplane.Status.PollInterval = time.Minute + agentConfig.AgentMetrics.ReportInterval = time.Minute + + tests := []struct { + name string + rootHandler *RootHandler + expected *HealthResponse + }{ + { + name: "Test 1: Everything healthy", + rootHandler: &RootHandler{ + config: agentConfig, + isGrpcRegistered: true, + lastCommandSent: time.Now(), + lastMetricReportSent: time.Now(), + }, + expected: &HealthResponse{ + Status: okStatus, + Checks: []HealthStatusCheck{ + { + Name: registration, + Status: okStatus, + }, + { + Name: commandConnection, + Status: okStatus, + }, + { + Name: metricsConnection, + Status: okStatus, + }, + }, + }, + }, + { + name: "Test 2: Registration failed", + rootHandler: &RootHandler{ + config: agentConfig, + isGrpcRegistered: false, + lastCommandSent: time.Now(), + lastMetricReportSent: time.Now(), + }, + expected: &HealthResponse{ + Status: errorStatus, + Checks: []HealthStatusCheck{ + { + Name: registration, + Status: errorStatus, + }, + { + Name: commandConnection, + Status: okStatus, + }, + { + Name: metricsConnection, + Status: okStatus, + }, + }, + }, + }, + { + name: "Test 3: Command service connection failed", + rootHandler: &RootHandler{ + config: agentConfig, + isGrpcRegistered: true, + lastCommandSent: time.Now().AddDate(0, 0, -1), + lastMetricReportSent: time.Now(), + }, + expected: &HealthResponse{ + Status: errorStatus, + Checks: []HealthStatusCheck{ + { + Name: registration, + Status: okStatus, + }, + { + Name: commandConnection, + Status: errorStatus, + }, + { + Name: metricsConnection, + Status: okStatus, + }, + }, + }, + }, + { + name: "Test 4: Metrics service connection failed", + rootHandler: &RootHandler{ + config: agentConfig, + isGrpcRegistered: true, + lastCommandSent: time.Now(), + lastMetricReportSent: time.Now().AddDate(0, 0, -1), + }, + expected: &HealthResponse{ + Status: errorStatus, + Checks: []HealthStatusCheck{ + { + Name: registration, + Status: okStatus, + }, + { + Name: commandConnection, + Status: okStatus, + }, + { + Name: metricsConnection, + Status: errorStatus, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + responseRecorder := httptest.NewRecorder() + + err := tt.rootHandler.healthCheck(responseRecorder) + require.NoError(t, err) + + assert.Equal(t, http.StatusOK, responseRecorder.Result().StatusCode) + + actualBody := &HealthResponse{} + err = json.NewDecoder(responseRecorder.Result().Body).Decode(actualBody) + require.NoError(t, err) + + assert.Equal(t, tt.expected, actualBody) + }) + } +} + func getConfig(t *testing.T) *tls.Config { crt, err := os.ReadFile("../../build/certs/client.crt") assert.NoError(t, err) diff --git a/src/plugins/commander.go b/src/plugins/commander.go index 8ce701833..be1179283 100644 --- a/src/plugins/commander.go +++ b/src/plugins/commander.go @@ -120,6 +120,8 @@ func (c *Commander) sendCommand(ctx context.Context, cmd *proto.Command) { log.Debugf("Sending command (messageId=%s), %v", cmd.GetMeta().MessageId, cmd.GetData()) if err := c.cmdr.Send(ctx, client.MessageFromCommand(cmd)); err != nil { log.Errorf("Error sending to command channel %v", err) + } else { + c.pipeline.Process(core.NewMessage(core.CommandSent, nil)) } } diff --git a/src/plugins/commander_test.go b/src/plugins/commander_test.go index 06bf76e87..fecc4f404 100644 --- a/src/plugins/commander_test.go +++ b/src/plugins/commander_test.go @@ -136,12 +136,14 @@ func TestCommander_Process(t *testing.T) { }, }, }, - setMocks: true, - topic: core.CommRegister, - nginxId: "", - systemId: "", - config: nil, - msgTopics: []string{}, + setMocks: true, + topic: core.CommRegister, + nginxId: "", + systemId: "", + config: nil, + msgTopics: []string{ + core.CommandSent, + }, }, { name: "test agent register with no details and config", diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index eaf35d07d..ac493ddc9 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -152,7 +152,6 @@ func (m *Metrics) Info() *core.Info { func (m *Metrics) Subscriptions() []string { return []string{ - core.RegistrationCompletedTopic, core.AgentCollectorsUpdate, core.AgentConfigChanged, core.NginxPluginConfigured, diff --git a/src/plugins/metrics_sender.go b/src/plugins/metrics_sender.go index 38bb2af93..68e587ffd 100644 --- a/src/plugins/metrics_sender.go +++ b/src/plugins/metrics_sender.go @@ -59,7 +59,7 @@ func (r *MetricsSender) Info() *core.Info { } func (r *MetricsSender) Process(msg *core.Message) { - if msg.Exact(core.RegistrationCompletedTopic) { + if msg.Exact(core.AgentConnected) { r.readyToSend.Toggle() return } @@ -81,6 +81,8 @@ func (r *MetricsSender) Process(msg *core.Message) { err := r.reporter.Send(r.ctx, message) if err != nil { log.Errorf("Failed to send MetricsReport: %v", err) + } else { + r.pipeline.Process(core.NewMessage(core.MetricReportSent, nil)) } case *models.EventReport: err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) @@ -119,5 +121,5 @@ func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) { } func (r *MetricsSender) Subscriptions() []string { - return []string{core.CommMetrics, core.RegistrationCompletedTopic, core.AgentConfigChanged} + return []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged} } diff --git a/src/plugins/metrics_sender_test.go b/src/plugins/metrics_sender_test.go index 3ecf51874..02a8d5e5b 100644 --- a/src/plugins/metrics_sender_test.go +++ b/src/plugins/metrics_sender_test.go @@ -53,7 +53,7 @@ func TestMetricsSenderSendMetrics(t *testing.T) { assert.True(t, pluginUnderTest.started.Load()) assert.False(t, pluginUnderTest.readyToSend.Load()) - pluginUnderTest.Process(core.NewMessage(core.RegistrationCompletedTopic, nil)) + pluginUnderTest.Process(core.NewMessage(core.AgentConnected, nil)) assert.True(t, pluginUnderTest.readyToSend.Load()) @@ -113,7 +113,7 @@ func TestMetricsSenderBackoff(t *testing.T) { pluginUnderTest := NewMetricsSender(mockMetricsReportClient) pluginUnderTest.Init(core.NewMockMessagePipe(ctx)) - pluginUnderTest.Process(core.NewMessage(core.RegistrationCompletedTopic, nil)) + pluginUnderTest.Process(core.NewMessage(core.AgentConnected, nil)) if !reflect.ValueOf(test.wantBackoff).IsZero() { mockMetricsReportClient.On("WithBackoffSettings", test.wantBackoff) @@ -131,5 +131,5 @@ func TestMetricsSenderBackoff(t *testing.T) { func TestMetricsSenderSubscriptions(t *testing.T) { pluginUnderTest := NewMetricsSender(tutils.NewMockMetricsReportClient()) - assert.Equal(t, []string{core.CommMetrics, core.RegistrationCompletedTopic, core.AgentConfigChanged}, pluginUnderTest.Subscriptions()) + assert.Equal(t, []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged}, pluginUnderTest.Subscriptions()) } diff --git a/src/plugins/metrics_test.go b/src/plugins/metrics_test.go index f59dfa130..89bbed2ba 100644 --- a/src/plugins/metrics_test.go +++ b/src/plugins/metrics_test.go @@ -326,7 +326,6 @@ func TestMetrics_Info(t *testing.T) { func TestMetrics_Subscriptions(t *testing.T) { subs := []string{ - core.RegistrationCompletedTopic, core.AgentCollectorsUpdate, core.AgentConfigChanged, core.NginxPluginConfigured, diff --git a/src/plugins/registration.go b/src/plugins/registration.go index c7ef07cb8..e4d7d5ac3 100644 --- a/src/plugins/registration.go +++ b/src/plugins/registration.go @@ -86,7 +86,7 @@ func (r *OneTimeRegistration) Info() *core.Info { func (r *OneTimeRegistration) Process(msg *core.Message) { switch { - case msg.Exact(core.RegistrationCompletedTopic): + case msg.Exact(core.AgentConnected): log.Info("OneTimeRegistration completed") case msg.Exact(core.DataplaneSoftwareDetailsUpdated): switch data := msg.Data().(type) { @@ -104,7 +104,7 @@ func (r *OneTimeRegistration) Process(msg *core.Message) { func (r *OneTimeRegistration) Subscriptions() []string { return []string{ - core.RegistrationCompletedTopic, + core.AgentConnected, core.DataplaneSoftwareDetailsUpdated, core.NginxDetailProcUpdate, } @@ -237,10 +237,7 @@ func (r *OneTimeRegistration) registerAgent() { }, } - r.pipeline.Process( - core.NewMessage(core.CommRegister, agentConnectRequest), - core.NewMessage(core.RegistrationCompletedTopic, nil), - ) + r.pipeline.Process(core.NewMessage(core.CommRegister, agentConnectRequest)) } // dataplaneSoftwareDetails converts the map of dataplane software details into a diff --git a/src/plugins/registration_test.go b/src/plugins/registration_test.go index 64016e6df..8fc23fe97 100644 --- a/src/plugins/registration_test.go +++ b/src/plugins/registration_test.go @@ -30,7 +30,7 @@ func TestRegistration_startRegistration(t *testing.T) { }{ { name: "test registration", - expectedMessageCount: 2, + expectedMessageCount: 1, }, } @@ -65,9 +65,6 @@ func TestRegistration_startRegistration(t *testing.T) { assert.Equal(tt, messages[0].Topic(), core.CommRegister) assert.NotNil(tt, messages[0].Data()) - - assert.Equal(tt, messages[1].Topic(), core.RegistrationCompletedTopic) - assert.Nil(tt, messages[1].Data()) }) } } @@ -87,7 +84,7 @@ func TestRegistration_areDataplaneSoftwareDetailsReady(t *testing.T) { func TestRegistration_Subscriptions(t *testing.T) { pluginUnderTest := NewOneTimeRegistration(tutils.GetMockAgentConfig(), nil, tutils.GetMockEnv(), nil, tutils.GetProcesses()) - assert.Equal(t, []string{core.RegistrationCompletedTopic, core.DataplaneSoftwareDetailsUpdated, core.NginxDetailProcUpdate}, pluginUnderTest.Subscriptions()) + assert.Equal(t, []string{core.AgentConnected, core.DataplaneSoftwareDetailsUpdated, core.NginxDetailProcUpdate}, pluginUnderTest.Subscriptions()) } func TestRegistration_Info(t *testing.T) { diff --git a/test/component/nginx-app-protect/monitoring/monitoring_test.go b/test/component/nginx-app-protect/monitoring/monitoring_test.go index 01183bea0..03e916dae 100644 --- a/test/component/nginx-app-protect/monitoring/monitoring_test.go +++ b/test/component/nginx-app-protect/monitoring/monitoring_test.go @@ -143,7 +143,7 @@ func TestNAPMonitoring(t *testing.T) { pipe := initializeMessagePipe(t, ctx, []core.Plugin{metricsSender}, []core.ExtensionPlugin{napMonitoring}) - pipe.Process(core.NewMessage(core.RegistrationCompletedTopic, nil)) + pipe.Process(core.NewMessage(core.AgentConnected, nil)) wg.Add(1) go func() { diff --git a/test/integration/features/features_test.go b/test/integration/features/features_test.go index 4701ffd53..6e78de223 100644 --- a/test/integration/features/features_test.go +++ b/test/integration/features/features_test.go @@ -20,7 +20,7 @@ func TestFeatures_NginxCountingEnabled(t *testing.T) { t, "features-nginx-counting-enabled", "./test_configs/nginx-agent-counting.conf:/etc/nginx-agent/nginx-agent.conf", - "OneTimeRegistration completed", + "MetricsThrottle waiting for report ready", ) utils.TestAgentHasNoErrorLogs(t, testContainer) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/client.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/client.go index 1aa31c0f4..29e7a26e8 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/client.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/client.go @@ -78,7 +78,7 @@ type ( WithClient(Client) Controller Context() context.Context WithContext(context.Context) Controller - Connect() error + Connect() Close() error } ) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go index 7423abece..6383fed9a 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/commander.go @@ -10,6 +10,7 @@ package client import ( "context" "encoding/json" + "errors" "fmt" "io" "sync" @@ -74,11 +75,15 @@ func (c *commander) Connect(ctx context.Context) error { log.Debugf("Commander connecting to %s", c.server) c.ctx = ctx + + c.retryLock.Lock() err := backoff.WaitUntil( c.ctx, c.backoffSettings, c.createClient, ) + c.retryLock.Unlock() + if err != nil { return err } @@ -163,6 +168,11 @@ func (c *commander) Send(ctx context.Context, message Message) error { return err } + if c.channel == nil { + c.setIsRetrying(true) + return c.handleGrpcError("Commander Channel Send", errors.New("command channel client not created yet")) + } + if err := c.channel.Send(cmd); err != nil { c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go index 7117269d0..c1d23ea70 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/controller.go @@ -10,6 +10,8 @@ package client import ( "context" "fmt" + + log "github.com/sirupsen/logrus" ) func NewClientController() Controller { @@ -33,19 +35,12 @@ func (c *ctrl) WithContext(ctx context.Context) Controller { return c } -func (c *ctrl) Connect() error { - var retErr error +func (c *ctrl) Connect() { for _, client := range c.clients { if err := client.Connect(c.ctx); err != nil { - if retErr == nil { - retErr = fmt.Errorf("%s failed to connect: %w", client.Server(), err) - } else { - retErr = fmt.Errorf("%v\n%s failed to connect: %w", retErr, client.Server(), err) - } + log.Warnf("%s failed to connect: %v", client.Server(), err) } } - - return retErr } func (c *ctrl) Close() error { diff --git a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 44158b4e9..bfb26365a 100644 --- a/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/integration/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -9,6 +9,7 @@ package client import ( "context" + "errors" "fmt" "io" "sync" @@ -28,6 +29,7 @@ func NewMetricReporterClient() MetricReporter { return &metricReporter{ connector: newConnector(), backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -39,6 +41,8 @@ type metricReporter struct { ctx context.Context mu sync.Mutex backoffSettings backoff.BackoffSettings + isRetrying bool + retryLock sync.Mutex } func (r *metricReporter) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -57,11 +61,14 @@ func (r *metricReporter) Connect(ctx context.Context) error { log.Debugf("Metric Reporter connecting to %s", r.server) r.ctx = ctx + + r.retryLock.Lock() err := backoff.WaitUntil( r.ctx, r.backoffSettings, r.createClient, ) + r.retryLock.Unlock() if err != nil { return err } @@ -151,18 +158,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err := r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err + } + + if r.channel == nil { + r.isRetrying = true + return r.handleGrpcError("Metric Reporter Channel Send", errors.New("metric service stream client not created yet")) } + if err := r.channel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Channel Send", err) } @@ -176,18 +184,14 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err = r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err } + if err := r.eventsChannel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Events Channel Send", err) } @@ -202,6 +206,21 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return err } +func (r *metricReporter) checkClientConnection() error { + r.retryLock.Lock() + defer r.retryLock.Unlock() + + if r.isRetrying { + log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) + err := r.createClient() + if err != nil { + return err + } + } + + return nil +} + func (r *metricReporter) closeConnection() error { var err error if r.channel != nil { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go index 6f6308c7c..7051eb03d 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -10,7 +10,6 @@ package core const ( UNKNOWN = "unknown" RegistrationPrefix = "registration." - RegistrationCompletedTopic = RegistrationPrefix + "completed" CommNginxConfig = "nginx.config" NginxConfigUpload = "nginx.config.upload" NginxReload = "nginx.reload" @@ -49,4 +48,6 @@ const ( EnableExtension = "enable.extension" EnableFeature = "enable.feature" AgentAPIConfigApplyResponse = "agent.api.config.apply.response" + CommandSent = "command.sent" + MetricReportSent = "metrics.report.sent" ) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go index a9bae0bb6..1a285e354 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go @@ -50,6 +50,7 @@ func GetMockAgentConfig() *config.Config { Host: "127.0.0.1", GrpcPort: 67890, }, + Features: agent_config.GetDefaultFeatures(), } } diff --git a/test/performance/metrics_test.go b/test/performance/metrics_test.go index 1f7dbbec2..899a97662 100644 --- a/test/performance/metrics_test.go +++ b/test/performance/metrics_test.go @@ -210,12 +210,34 @@ func (h *handler) handle(server proto.Commander_CommandChannelServer, wg *sync.W }() h.handleCount.Inc() for { - _, err := server.Recv() + commandReceived, err := server.Recv() if err != nil { fmt.Printf("Command Error: %v\n", err) return } h.msgCount.Inc() + + connectRequest := commandReceived.GetAgentConnectRequest() + if connectRequest != nil { + err = server.Send( + &proto.Command{ + Data: &proto.Command_AgentConnectResponse{ + AgentConnectResponse: &proto.AgentConnectResponse{ + AgentConfig: &proto.AgentConfig{ + Details: connectRequest.GetMeta().GetAgentDetails(), + }, + Status: &proto.AgentConnectStatus{ + StatusCode: proto.AgentConnectStatus_CONNECT_OK, + }, + }, + }, + }, + ) + if err != nil { + fmt.Printf("Command Error: %v\n", err) + return + } + } } } @@ -313,10 +335,10 @@ func startNginxAgent(b *testing.B) { controller.WithClient(commander) controller.WithClient(reporter) - if err := controller.Connect(); err != nil { - fmt.Printf("Unable to connect to control plane: %v", err) - return - } + + controller.Connect() + + time.Sleep(5 * time.Second) processes := env.Processes() diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/client.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/client.go index 1aa31c0f4..29e7a26e8 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/client.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/client.go @@ -78,7 +78,7 @@ type ( WithClient(Client) Controller Context() context.Context WithContext(context.Context) Controller - Connect() error + Connect() Close() error } ) diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go index 7423abece..6383fed9a 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/commander.go @@ -10,6 +10,7 @@ package client import ( "context" "encoding/json" + "errors" "fmt" "io" "sync" @@ -74,11 +75,15 @@ func (c *commander) Connect(ctx context.Context) error { log.Debugf("Commander connecting to %s", c.server) c.ctx = ctx + + c.retryLock.Lock() err := backoff.WaitUntil( c.ctx, c.backoffSettings, c.createClient, ) + c.retryLock.Unlock() + if err != nil { return err } @@ -163,6 +168,11 @@ func (c *commander) Send(ctx context.Context, message Message) error { return err } + if c.channel == nil { + c.setIsRetrying(true) + return c.handleGrpcError("Commander Channel Send", errors.New("command channel client not created yet")) + } + if err := c.channel.Send(cmd); err != nil { c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go index 7117269d0..c1d23ea70 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/controller.go @@ -10,6 +10,8 @@ package client import ( "context" "fmt" + + log "github.com/sirupsen/logrus" ) func NewClientController() Controller { @@ -33,19 +35,12 @@ func (c *ctrl) WithContext(ctx context.Context) Controller { return c } -func (c *ctrl) Connect() error { - var retErr error +func (c *ctrl) Connect() { for _, client := range c.clients { if err := client.Connect(c.ctx); err != nil { - if retErr == nil { - retErr = fmt.Errorf("%s failed to connect: %w", client.Server(), err) - } else { - retErr = fmt.Errorf("%v\n%s failed to connect: %w", retErr, client.Server(), err) - } + log.Warnf("%s failed to connect: %v", client.Server(), err) } } - - return retErr } func (c *ctrl) Close() error { diff --git a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 44158b4e9..bfb26365a 100644 --- a/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/test/performance/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -9,6 +9,7 @@ package client import ( "context" + "errors" "fmt" "io" "sync" @@ -28,6 +29,7 @@ func NewMetricReporterClient() MetricReporter { return &metricReporter{ connector: newConnector(), backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -39,6 +41,8 @@ type metricReporter struct { ctx context.Context mu sync.Mutex backoffSettings backoff.BackoffSettings + isRetrying bool + retryLock sync.Mutex } func (r *metricReporter) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -57,11 +61,14 @@ func (r *metricReporter) Connect(ctx context.Context) error { log.Debugf("Metric Reporter connecting to %s", r.server) r.ctx = ctx + + r.retryLock.Lock() err := backoff.WaitUntil( r.ctx, r.backoffSettings, r.createClient, ) + r.retryLock.Unlock() if err != nil { return err } @@ -151,18 +158,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err := r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err + } + + if r.channel == nil { + r.isRetrying = true + return r.handleGrpcError("Metric Reporter Channel Send", errors.New("metric service stream client not created yet")) } + if err := r.channel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Channel Send", err) } @@ -176,18 +184,14 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err = r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err } + if err := r.eventsChannel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Events Channel Send", err) } @@ -202,6 +206,21 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return err } +func (r *metricReporter) checkClientConnection() error { + r.retryLock.Lock() + defer r.retryLock.Unlock() + + if r.isRetrying { + log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) + err := r.createClient() + if err != nil { + return err + } + } + + return nil +} + func (r *metricReporter) closeConnection() error { var err error if r.channel != nil { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go index 6f6308c7c..7051eb03d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/topics.go @@ -10,7 +10,6 @@ package core const ( UNKNOWN = "unknown" RegistrationPrefix = "registration." - RegistrationCompletedTopic = RegistrationPrefix + "completed" CommNginxConfig = "nginx.config" NginxConfigUpload = "nginx.config.upload" NginxReload = "nginx.reload" @@ -49,4 +48,6 @@ const ( EnableExtension = "enable.extension" EnableFeature = "enable.feature" AgentAPIConfigApplyResponse = "agent.api.config.apply.response" + CommandSent = "command.sent" + MetricReportSent = "metrics.report.sent" ) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go index 2fe5e86ef..2895c0529 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/agent_api.go @@ -45,9 +45,14 @@ const ( pendingStatus = "PENDING" errorStatus = "ERROR" unknownStatus = "UNKNOWN" + + registration string = "registration" + commandConnection string = "commandConnection" + metricsConnection string = "metricsConnection" ) var ( + healthRegex = regexp.MustCompile(`^\/health[\/]*$`) instancesRegex = regexp.MustCompile(`^\/nginx[\/]*$`) configRegex = regexp.MustCompile(`^\/nginx/config[\/]*$`) configStatusRegex = regexp.MustCompile(`^\/nginx/config/status[\/]*$`) @@ -60,10 +65,18 @@ type AgentAPI struct { server http.Server nginxBinary core.NginxBinary nginxHandler *NginxHandler + rootHandler *RootHandler exporter *prometheus_metrics.Exporter processes []*core.Process } +type RootHandler struct { + config *config.Config + isGrpcRegistered bool + lastCommandSent time.Time + lastMetricReportSent time.Time +} + type NginxHandler struct { config *config.Config env core.Environment @@ -132,6 +145,25 @@ type AgentAPIConfigApplyStatusResponse struct { Status string `json:"status"` } +// swagger:model HealthStatusCheck +type HealthStatusCheck struct { + // Health check name + // example: commandConnection + Name string `json:"name"` + // Health check status + // example: OK + Status string `json:"status"` +} + +// swagger:model HealthResponse +type HealthResponse struct { + // Overall health status + // example: OK + Status string `json:"status"` + // Array of health checks + Checks []HealthStatusCheck `json:"checks"` +} + const ( contentTypeHeader = "Content-Type" jsonMimeType = "application/json" @@ -191,6 +223,12 @@ func (a *AgentAPI) Process(message *core.Message) { if a.nginxHandler != nil { a.nginxHandler.syncProcessInfo(a.processes) } + case core.AgentConnected: + a.rootHandler.isGrpcRegistered = true + case core.CommandSent: + a.rootHandler.lastCommandSent = time.Now() + case core.MetricReportSent: + a.rootHandler.lastMetricReportSent = time.Now() } } @@ -206,10 +244,19 @@ func (a *AgentAPI) Subscriptions() []string { core.NginxConfigApplyFailed, core.NginxConfigApplySucceeded, core.NginxDetailProcUpdate, + core.AgentConnected, + core.CommandSent, + core.MetricReportSent, } } func (a *AgentAPI) createHttpServer() { + a.rootHandler = &RootHandler{ + config: a.config, + isGrpcRegistered: false, + lastMetricReportSent: time.Now(), + } + a.nginxHandler = &NginxHandler{ config: a.config, pipeline: a.pipeline, @@ -224,6 +271,7 @@ func (a *AgentAPI) createHttpServer() { mux.Handle("/metrics/", a.getPrometheusHandler()) mux.Handle("/nginx/", a.nginxHandler) + mux.Handle("/", a.rootHandler) handler := cors.New(cors.Options{AllowedMethods: []string{"OPTIONS", "GET", "PUT"}}).Handler(mux) a.server = http.Server{ @@ -546,7 +594,7 @@ func (h *NginxHandler) applyNginxConfig(nginxDetail *proto.NginxDetails, buf *by // 200: AgentAPIConfigApplyResponse // 400: AgentAPIConfigApplyStatusResponse // 404: AgentAPIConfigApplyStatusResponse -// 500 +// 500: AgentAPICommonResponse func (h *NginxHandler) getConfigStatus(w http.ResponseWriter, r *http.Request) error { correlationId := r.URL.Query().Get("correlation_id") @@ -605,6 +653,94 @@ func (h *NginxHandler) syncProcessInfo(processInfo []*core.Process) { h.processes = processInfo } +func (rh *RootHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set(contentTypeHeader, jsonMimeType) + + switch { + case healthRegex.MatchString(r.URL.Path): + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + err := rh.healthCheck(w) + if err != nil { + log.Warnf("Failed to get agent health: %v", err) + } + default: + w.WriteHeader(http.StatusNotFound) + _, err := fmt.Fprint(w, []byte("not found")) + if err != nil { + log.Warnf("Failed to send api response: %v", err) + } + } +} + +// swagger:route GET /health nginx-agent health-check +// +// # Check the health of the NGINX Agent +// +// responses: +// +// 200: HealthResponse +func (rh *RootHandler) healthCheck(w http.ResponseWriter) error { + w.WriteHeader(http.StatusOK) + + overallStatus := okStatus + checks := []HealthStatusCheck{} + + registrationStatus := okStatus + commandServiceStatus := okStatus + metricsServiceStatus := okStatus + + if rh.config.IsGrpcServerConfigured() { + if !rh.isGrpcRegistered { + registrationStatus = errorStatus + overallStatus = errorStatus + } + + checks = append(checks, HealthStatusCheck{ + Name: registration, + Status: registrationStatus, + }) + + timeNow := time.Now() + + lastCommandSentDiff := timeNow.Sub(rh.lastCommandSent) + + if lastCommandSentDiff > (2 * rh.config.Dataplane.Status.PollInterval) { + commandServiceStatus = errorStatus + overallStatus = errorStatus + } + + checks = append(checks, HealthStatusCheck{ + Name: commandConnection, + Status: commandServiceStatus, + }) + + if rh.config.IsFeatureEnabled(agent_config.FeatureMetrics) || rh.config.IsFeatureEnabled(agent_config.FeatureMetricsSender) { + lastMetricReportSentDiff := timeNow.Sub(rh.lastMetricReportSent) + + if lastMetricReportSentDiff > (2 * rh.config.AgentMetrics.ReportInterval) { + metricsServiceStatus = errorStatus + overallStatus = errorStatus + } + + checks = append(checks, HealthStatusCheck{ + Name: metricsConnection, + Status: metricsServiceStatus, + }) + } + } + + healthResponse := &HealthResponse{ + Status: overallStatus, + Checks: checks, + } + + return writeObjectToResponseBody(w, healthResponse) +} + func writeObjectToResponseBody(w http.ResponseWriter, response any) error { respBody := new(bytes.Buffer) err := json.NewEncoder(respBody).Encode(response) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go index 8ce701833..be1179283 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/commander.go @@ -120,6 +120,8 @@ func (c *Commander) sendCommand(ctx context.Context, cmd *proto.Command) { log.Debugf("Sending command (messageId=%s), %v", cmd.GetMeta().MessageId, cmd.GetData()) if err := c.cmdr.Send(ctx, client.MessageFromCommand(cmd)); err != nil { log.Errorf("Error sending to command channel %v", err) + } else { + c.pipeline.Process(core.NewMessage(core.CommandSent, nil)) } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index eaf35d07d..ac493ddc9 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -152,7 +152,6 @@ func (m *Metrics) Info() *core.Info { func (m *Metrics) Subscriptions() []string { return []string{ - core.RegistrationCompletedTopic, core.AgentCollectorsUpdate, core.AgentConfigChanged, core.NginxPluginConfigured, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go index 38bb2af93..68e587ffd 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_sender.go @@ -59,7 +59,7 @@ func (r *MetricsSender) Info() *core.Info { } func (r *MetricsSender) Process(msg *core.Message) { - if msg.Exact(core.RegistrationCompletedTopic) { + if msg.Exact(core.AgentConnected) { r.readyToSend.Toggle() return } @@ -81,6 +81,8 @@ func (r *MetricsSender) Process(msg *core.Message) { err := r.reporter.Send(r.ctx, message) if err != nil { log.Errorf("Failed to send MetricsReport: %v", err) + } else { + r.pipeline.Process(core.NewMessage(core.MetricReportSent, nil)) } case *models.EventReport: err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) @@ -119,5 +121,5 @@ func (r *MetricsSender) metricSenderBackoff(agentConfig *proto.AgentConfig) { } func (r *MetricsSender) Subscriptions() []string { - return []string{core.CommMetrics, core.RegistrationCompletedTopic, core.AgentConfigChanged} + return []string{core.CommMetrics, core.AgentConnected, core.AgentConfigChanged} } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go index c7ef07cb8..e4d7d5ac3 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/registration.go @@ -86,7 +86,7 @@ func (r *OneTimeRegistration) Info() *core.Info { func (r *OneTimeRegistration) Process(msg *core.Message) { switch { - case msg.Exact(core.RegistrationCompletedTopic): + case msg.Exact(core.AgentConnected): log.Info("OneTimeRegistration completed") case msg.Exact(core.DataplaneSoftwareDetailsUpdated): switch data := msg.Data().(type) { @@ -104,7 +104,7 @@ func (r *OneTimeRegistration) Process(msg *core.Message) { func (r *OneTimeRegistration) Subscriptions() []string { return []string{ - core.RegistrationCompletedTopic, + core.AgentConnected, core.DataplaneSoftwareDetailsUpdated, core.NginxDetailProcUpdate, } @@ -237,10 +237,7 @@ func (r *OneTimeRegistration) registerAgent() { }, } - r.pipeline.Process( - core.NewMessage(core.CommRegister, agentConnectRequest), - core.NewMessage(core.RegistrationCompletedTopic, nil), - ) + r.pipeline.Process(core.NewMessage(core.CommRegister, agentConnectRequest)) } // dataplaneSoftwareDetails converts the map of dataplane software details into a diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go index a9bae0bb6..1a285e354 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/agent_config.go @@ -50,6 +50,7 @@ func GetMockAgentConfig() *config.Config { Host: "127.0.0.1", GrpcPort: 67890, }, + Features: agent_config.GetDefaultFeatures(), } } diff --git a/test/utils/agent_config.go b/test/utils/agent_config.go index a9bae0bb6..1a285e354 100644 --- a/test/utils/agent_config.go +++ b/test/utils/agent_config.go @@ -50,6 +50,7 @@ func GetMockAgentConfig() *config.Config { Host: "127.0.0.1", GrpcPort: 67890, }, + Features: agent_config.GetDefaultFeatures(), } } diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/client.go b/vendor/github.com/nginx/agent/sdk/v2/client/client.go index 1aa31c0f4..29e7a26e8 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/client.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/client.go @@ -78,7 +78,7 @@ type ( WithClient(Client) Controller Context() context.Context WithContext(context.Context) Controller - Connect() error + Connect() Close() error } ) diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/commander.go b/vendor/github.com/nginx/agent/sdk/v2/client/commander.go index 7423abece..6383fed9a 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/commander.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/commander.go @@ -10,6 +10,7 @@ package client import ( "context" "encoding/json" + "errors" "fmt" "io" "sync" @@ -74,11 +75,15 @@ func (c *commander) Connect(ctx context.Context) error { log.Debugf("Commander connecting to %s", c.server) c.ctx = ctx + + c.retryLock.Lock() err := backoff.WaitUntil( c.ctx, c.backoffSettings, c.createClient, ) + c.retryLock.Unlock() + if err != nil { return err } @@ -163,6 +168,11 @@ func (c *commander) Send(ctx context.Context, message Message) error { return err } + if c.channel == nil { + c.setIsRetrying(true) + return c.handleGrpcError("Commander Channel Send", errors.New("command channel client not created yet")) + } + if err := c.channel.Send(cmd); err != nil { c.setIsRetrying(true) return c.handleGrpcError("Commander Channel Send", err) diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/controller.go b/vendor/github.com/nginx/agent/sdk/v2/client/controller.go index 7117269d0..c1d23ea70 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/controller.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/controller.go @@ -10,6 +10,8 @@ package client import ( "context" "fmt" + + log "github.com/sirupsen/logrus" ) func NewClientController() Controller { @@ -33,19 +35,12 @@ func (c *ctrl) WithContext(ctx context.Context) Controller { return c } -func (c *ctrl) Connect() error { - var retErr error +func (c *ctrl) Connect() { for _, client := range c.clients { if err := client.Connect(c.ctx); err != nil { - if retErr == nil { - retErr = fmt.Errorf("%s failed to connect: %w", client.Server(), err) - } else { - retErr = fmt.Errorf("%v\n%s failed to connect: %w", retErr, client.Server(), err) - } + log.Warnf("%s failed to connect: %v", client.Server(), err) } } - - return retErr } func (c *ctrl) Close() error { diff --git a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go index 44158b4e9..bfb26365a 100644 --- a/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go +++ b/vendor/github.com/nginx/agent/sdk/v2/client/metric_reporter.go @@ -9,6 +9,7 @@ package client import ( "context" + "errors" "fmt" "io" "sync" @@ -28,6 +29,7 @@ func NewMetricReporterClient() MetricReporter { return &metricReporter{ connector: newConnector(), backoffSettings: DefaultBackoffSettings, + isRetrying: false, } } @@ -39,6 +41,8 @@ type metricReporter struct { ctx context.Context mu sync.Mutex backoffSettings backoff.BackoffSettings + isRetrying bool + retryLock sync.Mutex } func (r *metricReporter) WithInterceptor(interceptor interceptors.Interceptor) Client { @@ -57,11 +61,14 @@ func (r *metricReporter) Connect(ctx context.Context) error { log.Debugf("Metric Reporter connecting to %s", r.server) r.ctx = ctx + + r.retryLock.Lock() err := backoff.WaitUntil( r.ctx, r.backoffSettings, r.createClient, ) + r.retryLock.Unlock() if err != nil { return err } @@ -151,18 +158,19 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err := r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err + } + + if r.channel == nil { + r.isRetrying = true + return r.handleGrpcError("Metric Reporter Channel Send", errors.New("metric service stream client not created yet")) } + if err := r.channel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Channel Send", err) } @@ -176,18 +184,14 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data()) } - isRetrying := false - err = backoff.WaitUntil(r.ctx, r.backoffSettings, func() error { - if isRetrying { - log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) - err = r.createClient() - if err != nil { - return err - } + err := r.checkClientConnection() + if err != nil { + return err } + if err := r.eventsChannel.Send(report); err != nil { - isRetrying = true + r.isRetrying = true return r.handleGrpcError("Metric Reporter Events Channel Send", err) } @@ -202,6 +206,21 @@ func (r *metricReporter) Send(ctx context.Context, message Message) error { return err } +func (r *metricReporter) checkClientConnection() error { + r.retryLock.Lock() + defer r.retryLock.Unlock() + + if r.isRetrying { + log.Infof("Metric Reporter Channel Send: retrying to connect to %s", r.grpc.Target()) + err := r.createClient() + if err != nil { + return err + } + } + + return nil +} + func (r *metricReporter) closeConnection() error { var err error if r.channel != nil {