From fb6a3b4119e20d742d6aeeb813b2a0a40b890386 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 29 Nov 2023 11:24:45 +0100 Subject: [PATCH 01/10] report output health --- internal/pkg/dl/constants.go | 1 + internal/pkg/dl/output_health.go | 42 ++++++++++++++++++++++++++++ internal/pkg/policy/policy_output.go | 22 ++++++++++++++- internal/pkg/policy/self.go | 26 +++++++++++++++++ 4 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 internal/pkg/dl/output_health.go diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 703117bca..2393f73f0 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -16,6 +16,7 @@ const ( FleetPolicies = ".fleet-policies" FleetPoliciesLeader = ".fleet-policies-leader" FleetServers = ".fleet-servers" + FleetOutputHealth = "logs-fleet_server.output_health-default" ) // Query fields diff --git a/internal/pkg/dl/output_health.go b/internal/pkg/dl/output_health.go new file mode 100644 index 000000000..42482c004 --- /dev/null +++ b/internal/pkg/dl/output_health.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package dl + +import ( + "context" + "encoding/json" + "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/gofrs/uuid" +) + +type OutputHealth struct { + Output string `json:"output,omitempty"` + State string `json:"state,omitempty"` + Message string `json:"message,omitempty"` + Timestamp string `json:"@timestamp,omitempty"` +} + +func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc OutputHealth) error { + return createOutputHealth(ctx, bulker, FleetOutputHealth, doc) +} + +func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc OutputHealth) error { + if doc.Timestamp == "" { + doc.Timestamp = time.Now().UTC().Format(time.RFC3339) + } + body, err := json.Marshal(doc) + if err != nil { + return err + } + + id, err := uuid.NewV4() + if err != nil { + return err + } + _, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh()) + return err +} diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index dfc51c21f..941c0b1c4 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -15,6 +15,7 @@ import ( "github.com/rs/zerolog" "go.elastic.co/apm/v2" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" @@ -260,10 +261,20 @@ func (p *Output) prepareElasticsearch( ctx := zlog.WithContext(ctx) outputAPIKey, err := generateOutputAPIKey(ctx, outputBulker, agent.Id, p.Name, p.Role.Raw) - // reporting output error status to self monitor and not returning the error to keep fleet-server running + + // reporting output health and not returning the error to keep fleet-server running if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch { if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES") + + doc := dl.OutputHealth{ + Output: p.Name, + State: client.UnitStateDegraded.String(), + Message: err.Error(), + } + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { + zlog.Error().Err(err).Msg("create output health") + } } // replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch @@ -271,6 +282,15 @@ func (p *Output) prepareElasticsearch( // remove the service token from the agent policy sent to the agent delete(outputMap[p.Name], FieldOutputServiceToken) return nil + } else if p.Type == OutputTypeRemoteElasticsearch { + doc := dl.OutputHealth{ + Output: p.Name, + State: client.UnitStateHealthy.String(), + Message: "", + } + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { + zlog.Error().Err(err).Msg("create output health") + } } if err != nil { return fmt.Errorf("failed generate output API key: %w", err) diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index b9c8d7bd0..1b4f278a0 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -218,6 +218,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return client.UnitStateStarting, nil } + reportOutputHealth(ctx, m.bulker, m.log) + state := client.UnitStateHealthy extendMsg := "" var payload map[string]interface{} @@ -253,6 +255,30 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return state, nil } +func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) { + //pinging logic + bulkerMap := bulker.GetBulkerMap() + for outputName, outputBulker := range bulkerMap { + doc := dl.OutputHealth{ + Output: outputName, + State: client.UnitStateHealthy.String(), + Message: "", + } + res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) + if err != nil { + logger.Error().Err(err).Msg("error calling remote es ping") + doc.State = client.UnitStateDegraded.String() + doc.Message = err.Error() + } else if res.StatusCode != 200 { + doc.State = client.UnitStateDegraded.String() + doc.Message = fmt.Sprintf("unexpected status code when pinging remote es: %d", res.StatusCode) + } + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { + logger.Error().Err(err).Msg("create output health") + } + } +} + func HasFleetServerInput(inputs []map[string]interface{}) bool { for _, input := range inputs { attr, ok := input["type"].(string) From 928aac5e0ea68fe4b8aa3bc1cba7dc8894a5bffb Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 29 Nov 2023 14:03:10 +0100 Subject: [PATCH 02/10] updated error messages, added data_stream fields --- internal/pkg/dl/output_health.go | 20 ++++++++++++++++---- internal/pkg/policy/policy_output.go | 10 +++++----- internal/pkg/policy/self.go | 10 ++++++---- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/internal/pkg/dl/output_health.go b/internal/pkg/dl/output_health.go index 42482c004..c1f4b4c82 100644 --- a/internal/pkg/dl/output_health.go +++ b/internal/pkg/dl/output_health.go @@ -13,11 +13,18 @@ import ( "github.com/gofrs/uuid" ) +type DataStream struct { + Dataset string `json:"dataset,omitempty"` + Type string `json:"type,omitempty"` + Namespace string `json:"namespace,omitempty"` +} + type OutputHealth struct { - Output string `json:"output,omitempty"` - State string `json:"state,omitempty"` - Message string `json:"message,omitempty"` - Timestamp string `json:"@timestamp,omitempty"` + Output string `json:"output,omitempty"` + State string `json:"state,omitempty"` + Message string `json:"message,omitempty"` + Timestamp string `json:"@timestamp,omitempty"` + DataStream DataStream `json:"data_stream,omitempty"` } func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc OutputHealth) error { @@ -28,6 +35,11 @@ func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc if doc.Timestamp == "" { doc.Timestamp = time.Now().UTC().Format(time.RFC3339) } + doc.DataStream = DataStream{ + Dataset: "fleet_server.output_health", + Type: "logs", + Namespace: "default", + } body, err := json.Marshal(doc) if err != nil { return err diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 941c0b1c4..81d02fa17 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -265,15 +265,15 @@ func (p *Output) prepareElasticsearch( // reporting output health and not returning the error to keep fleet-server running if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch { if err != nil { - zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES") - doc := dl.OutputHealth{ Output: p.Name, State: client.UnitStateDegraded.String(), - Message: err.Error(), + Message: fmt.Sprintf("remote ES %s could not create API key due to error: %s", p.Name, err.Error()), } + zerolog.Ctx(ctx).Warn().Err(err).Msg(doc.Message) + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { - zlog.Error().Err(err).Msg("create output health") + zlog.Error().Err(err).Msg("error writing output health") } } @@ -324,7 +324,7 @@ func (p *Output) prepareElasticsearch( // Using painless script to append the old keys to the history body, err := renderUpdatePainlessScript(p.Name, fields) if err != nil { - return fmt.Errorf("could no tupdate painless script: %w", err) + return fmt.Errorf("could not update painless script: %w", err) } if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 1b4f278a0..f01bfbb53 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -266,15 +266,17 @@ func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Lo } res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) if err != nil { - logger.Error().Err(err).Msg("error calling remote es ping") doc.State = client.UnitStateDegraded.String() - doc.Message = err.Error() + doc.Message = fmt.Sprintf("remote ES %s is not reachable due to error: %s", outputName, err.Error()) + logger.Error().Err(err).Msg(doc.Message) + } else if res.StatusCode != 200 { doc.State = client.UnitStateDegraded.String() - doc.Message = fmt.Sprintf("unexpected status code when pinging remote es: %d", res.StatusCode) + doc.Message = fmt.Sprintf("remote ES %s is not reachable due to: unexpected status code %d", outputName, res.StatusCode) + logger.Error().Err(err).Msg(doc.Message) } if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { - logger.Error().Err(err).Msg("create output health") + logger.Error().Err(err).Msg("error writing output health") } } } From e9c22f92ff49e2332995e6ba4381b838901fdcc4 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 29 Nov 2023 15:18:14 +0100 Subject: [PATCH 03/10] updated message --- internal/pkg/policy/policy_output.go | 2 +- internal/pkg/policy/self.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 81d02fa17..3d38ec9b4 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -268,7 +268,7 @@ func (p *Output) prepareElasticsearch( doc := dl.OutputHealth{ Output: p.Name, State: client.UnitStateDegraded.String(), - Message: fmt.Sprintf("remote ES %s could not create API key due to error: %s", p.Name, err.Error()), + Message: fmt.Sprintf("remote ES could not create API key due to error: %s", err.Error()), } zerolog.Ctx(ctx).Warn().Err(err).Msg(doc.Message) diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index f01bfbb53..e1096e749 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -267,12 +267,12 @@ func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Lo res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) if err != nil { doc.State = client.UnitStateDegraded.String() - doc.Message = fmt.Sprintf("remote ES %s is not reachable due to error: %s", outputName, err.Error()) + doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error()) logger.Error().Err(err).Msg(doc.Message) } else if res.StatusCode != 200 { doc.State = client.UnitStateDegraded.String() - doc.Message = fmt.Sprintf("remote ES %s is not reachable due to: unexpected status code %d", outputName, res.StatusCode) + doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode) logger.Error().Err(err).Msg(doc.Message) } if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { From 8a672a9e9ad2d800aa93bb7fe89c6942d675301b Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 30 Nov 2023 10:36:24 +0100 Subject: [PATCH 04/10] added output name to logging --- internal/pkg/policy/policy_output.go | 4 ++-- internal/pkg/policy/self.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 3d38ec9b4..ac72f301c 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -270,10 +270,10 @@ func (p *Output) prepareElasticsearch( State: client.UnitStateDegraded.String(), Message: fmt.Sprintf("remote ES could not create API key due to error: %s", err.Error()), } - zerolog.Ctx(ctx).Warn().Err(err).Msg(doc.Message) + zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message) if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { - zlog.Error().Err(err).Msg("error writing output health") + zlog.Error().Err(err).Str("outputName", p.Name).Msg("error writing output health") } } diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index e1096e749..60bb5bc41 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -268,15 +268,15 @@ func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Lo if err != nil { doc.State = client.UnitStateDegraded.String() doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error()) - logger.Error().Err(err).Msg(doc.Message) + logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) } else if res.StatusCode != 200 { doc.State = client.UnitStateDegraded.String() doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode) - logger.Error().Err(err).Msg(doc.Message) + logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) } if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { - logger.Error().Err(err).Msg("error writing output health") + logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health") } } } From eea71f31bb7e4437637791810be659c4894fc026 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 30 Nov 2023 11:09:55 +0100 Subject: [PATCH 05/10] added tests to policy_output --- internal/pkg/policy/policy_output_test.go | 56 +++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d6480460b..45072f5c9 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -8,13 +8,17 @@ package policy import ( "context" + "encoding/json" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" @@ -462,6 +466,14 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) { mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(&apiKey, nil).Once() bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once() + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc dl.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "" && doc.State == client.UnitStateHealthy.String() + }), mock.Anything).Return("", nil) output := Output{ Type: OutputTypeRemoteElasticsearch, @@ -500,4 +512,48 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) { bulker.AssertExpectations(t) }) + + t.Run("Report degraded output health on API key create failure", func(t *testing.T) { + logger := testlog.SetLogger(t) + bulker := ftesting.NewMockBulk() + var apiKey *bulk.APIKey = nil + var err error = errors.New("error connecting") + + outputBulker := ftesting.NewMockBulk() + outputBulker.On("APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(apiKey, err).Once() + bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once() + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc dl.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "remote ES could not create API key due to error: error connecting" && doc.State == client.UnitStateDegraded.String() + }), mock.Anything).Return("", nil) + + output := Output{ + Type: OutputTypeRemoteElasticsearch, + Name: "test output", + Role: &RoleT{ + Sha2: "new-hash", + Raw: TestPayload, + }, + } + + policyMap := map[string]map[string]interface{}{ + "test output": map[string]interface{}{ + "hosts": []interface{}{"http://localhost"}, + "service_token": "serviceToken1", + "type": OutputTypeRemoteElasticsearch, + }, + } + testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}} + + err = output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + require.NoError(t, err, "expected prepare to pass") + + bulker.AssertExpectations(t) + }) } From 41b70bc0af10001ef3a06b2a4125f2a62f6bcef0 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 30 Nov 2023 11:22:34 +0100 Subject: [PATCH 06/10] added test to reportOutputHealth in self monitor --- internal/pkg/policy/self_test.go | 60 ++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index dd1f4a865..8325e30a6 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -9,7 +9,9 @@ package policy import ( "context" "encoding/json" + "errors" "fmt" + "net/http" "sync" "testing" "time" @@ -26,6 +28,8 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" mmock "github.com/elastic/fleet-server/v7/internal/pkg/monitor/mock" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestSelfMonitor_DefaultPolicy(t *testing.T) { @@ -642,3 +646,59 @@ func (r *FakeReporter) Current() (client.UnitState, string, map[string]interface defer r.lock.Unlock() return r.state, r.msg, r.payload } + +func TestSelfMonitor_reportOutputHealthyState(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := testlog.SetLogger(t) + + bulker := ftesting.NewMockBulk() + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + mockEsClient, _ := esutil.MockESClient(t) + outputBulker.On("Client").Return(mockEsClient) + bulkerMap["remote"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc dl.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "" && doc.State == client.UnitStateHealthy.String() + }), mock.Anything).Return("", nil) + + reportOutputHealth(ctx, bulker, logger) + + bulker.AssertExpectations(t) + outputBulker.AssertExpectations(t) +} + +func TestSelfMonitor_reportOutputDegradedState(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := testlog.SetLogger(t) + + bulker := ftesting.NewMockBulk() + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + mockEsClient, mockTransport := esutil.MockESClient(t) + var err error = errors.New("error connecting") + mockTransport.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mockTransport.Response, err } + outputBulker.On("Client").Return(mockEsClient) + bulkerMap["remote"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc dl.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "remote ES is not reachable due to error: error connecting" && doc.State == client.UnitStateDegraded.String() + }), mock.Anything).Return("", nil) + + reportOutputHealth(ctx, bulker, logger) + + bulker.AssertExpectations(t) + outputBulker.AssertExpectations(t) +} From 5082970791530a542a54a3b161a7f4c94410ec21 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 30 Nov 2023 11:35:53 +0100 Subject: [PATCH 07/10] added changelog --- .../1701340465-report-output-health.yaml | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 changelog/fragments/1701340465-report-output-health.yaml diff --git a/changelog/fragments/1701340465-report-output-health.yaml b/changelog/fragments/1701340465-report-output-health.yaml new file mode 100644 index 000000000..d172a502e --- /dev/null +++ b/changelog/fragments/1701340465-report-output-health.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Report output health state to logs-fleet_server.output_health-default data stream + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/owner/repo/3127 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/owner/repo/3116 From 0b45780142fe4ea72fd41068357df076f84bc56e Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Thu, 30 Nov 2023 13:13:10 +0100 Subject: [PATCH 08/10] remove break on healthy to keep self monitor running --- internal/pkg/policy/self.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 60bb5bc41..18a6dc461 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -109,10 +109,7 @@ LOOP: return err } cT.Reset(m.checkTime) - if state == client.UnitStateHealthy { - // running; can stop - break LOOP - } + m.log.Trace().Msg(state.String()) case hits := <-s.Output(): policies := make([]model.Policy, len(hits)) for i, hit := range hits { @@ -125,10 +122,7 @@ LOOP: if err != nil { return err } - if state == client.UnitStateHealthy { - // running; can stop - break LOOP - } + m.log.Trace().Msg(state.String()) } } From 3d1888ad60a1417a232cd52cfa49eebd46b86668 Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Mon, 4 Dec 2023 09:03:57 +0100 Subject: [PATCH 09/10] Update internal/pkg/policy/policy_output.go Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> --- internal/pkg/policy/policy_output.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index ac72f301c..815256fc3 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -268,7 +268,7 @@ func (p *Output) prepareElasticsearch( doc := dl.OutputHealth{ Output: p.Name, State: client.UnitStateDegraded.String(), - Message: fmt.Sprintf("remote ES could not create API key due to error: %s", err.Error()), + Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err), } zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message) From 9644f30e1f30632b998fa81f52417e568caeb427 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 4 Dec 2023 10:54:30 +0100 Subject: [PATCH 10/10] moved OutputHealth to schema.json --- internal/pkg/dl/output_health.go | 21 +++---------- internal/pkg/model/schema.go | 25 +++++++++++++++ internal/pkg/policy/policy_output.go | 4 +-- internal/pkg/policy/policy_output_test.go | 4 +-- internal/pkg/policy/self.go | 2 +- internal/pkg/policy/self_test.go | 4 +-- model/schema.json | 37 +++++++++++++++++++++++ 7 files changed, 73 insertions(+), 24 deletions(-) diff --git a/internal/pkg/dl/output_health.go b/internal/pkg/dl/output_health.go index c1f4b4c82..2282fb699 100644 --- a/internal/pkg/dl/output_health.go +++ b/internal/pkg/dl/output_health.go @@ -10,32 +10,19 @@ import ( "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/gofrs/uuid" ) -type DataStream struct { - Dataset string `json:"dataset,omitempty"` - Type string `json:"type,omitempty"` - Namespace string `json:"namespace,omitempty"` -} - -type OutputHealth struct { - Output string `json:"output,omitempty"` - State string `json:"state,omitempty"` - Message string `json:"message,omitempty"` - Timestamp string `json:"@timestamp,omitempty"` - DataStream DataStream `json:"data_stream,omitempty"` -} - -func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc OutputHealth) error { +func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc model.OutputHealth) error { return createOutputHealth(ctx, bulker, FleetOutputHealth, doc) } -func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc OutputHealth) error { +func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc model.OutputHealth) error { if doc.Timestamp == "" { doc.Timestamp = time.Now().UTC().Format(time.RFC3339) } - doc.DataStream = DataStream{ + doc.DataStream = &model.DataStream{ Dataset: "fleet_server.output_health", Type: "logs", Namespace: "default", diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index ecb71f4aa..b8be8577c 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -305,6 +305,13 @@ type CheckinPolicyInputItems struct { TemplateID string `json:"template_id"` } +// DataStream +type DataStream struct { + Dataset string `json:"dataset,omitempty"` + Namespace string `json:"namespace,omitempty"` + Type string `json:"type,omitempty"` +} + // EnrollmentAPIKey An Elastic Agent enrollment API key type EnrollmentAPIKey struct { ESDocument @@ -342,6 +349,24 @@ type HostMetadata struct { Name string `json:"name"` } +// OutputHealth Output health represents a health state of an output +type OutputHealth struct { + ESDocument + DataStream *DataStream `json:"data_stream,omitempty"` + + // Long state message if unhealthy + Message string `json:"message,omitempty"` + + // Output ID + Output string `json:"output,omitempty"` + + // Health state, can be HEALTHY or DEGRADED + State string `json:"state,omitempty"` + + // Timestamp of reported state + Timestamp string `json:"@timestamp,omitempty"` +} + // Policy A policy that an Elastic Agent is attached to type Policy struct { ESDocument diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 815256fc3..c26186dc5 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -265,7 +265,7 @@ func (p *Output) prepareElasticsearch( // reporting output health and not returning the error to keep fleet-server running if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch { if err != nil { - doc := dl.OutputHealth{ + doc := model.OutputHealth{ Output: p.Name, State: client.UnitStateDegraded.String(), Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err), @@ -283,7 +283,7 @@ func (p *Output) prepareElasticsearch( delete(outputMap[p.Name], FieldOutputServiceToken) return nil } else if p.Type == OutputTypeRemoteElasticsearch { - doc := dl.OutputHealth{ + doc := model.OutputHealth{ Output: p.Name, State: client.UnitStateHealthy.String(), Message: "", diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index 45072f5c9..8f167b4b8 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -467,7 +467,7 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) { Return(&apiKey, nil).Once() bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once() bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { - var doc dl.OutputHealth + var doc model.OutputHealth err := json.Unmarshal(body, &doc) if err != nil { t.Fatal(err) @@ -525,7 +525,7 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) { Return(apiKey, err).Once() bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once() bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { - var doc dl.OutputHealth + var doc model.OutputHealth err := json.Unmarshal(body, &doc) if err != nil { t.Fatal(err) diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 18a6dc461..4803cf165 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -253,7 +253,7 @@ func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Lo //pinging logic bulkerMap := bulker.GetBulkerMap() for outputName, outputBulker := range bulkerMap { - doc := dl.OutputHealth{ + doc := model.OutputHealth{ Output: outputName, State: client.UnitStateHealthy.String(), Message: "", diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index 8325e30a6..6baf67e56 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -660,7 +660,7 @@ func TestSelfMonitor_reportOutputHealthyState(t *testing.T) { bulkerMap["remote"] = outputBulker bulker.On("GetBulkerMap").Return(bulkerMap) bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { - var doc dl.OutputHealth + var doc model.OutputHealth err := json.Unmarshal(body, &doc) if err != nil { t.Fatal(err) @@ -689,7 +689,7 @@ func TestSelfMonitor_reportOutputDegradedState(t *testing.T) { bulkerMap["remote"] = outputBulker bulker.On("GetBulkerMap").Return(bulkerMap) bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { - var doc dl.OutputHealth + var doc model.OutputHealth err := json.Unmarshal(body, &doc) if err != nil { t.Fatal(err) diff --git a/model/schema.json b/model/schema.json index 9857da512..75699d1aa 100644 --- a/model/schema.json +++ b/model/schema.json @@ -430,6 +430,43 @@ ] }, + "output_health": { + "description": "Output health represents a health state of an output", + "type": "object", + "properties": { + "output": { + "type": "string", + "description": "Output ID" + }, + "state": { + "type": "string", + "description": "Health state, can be HEALTHY or DEGRADED" + }, + "message": { + "type": "string", + "description": "Long state message if unhealthy" + }, + "@timestamp": { + "type": "string", + "description": "Timestamp of reported state" + }, + "data_stream": { + "type": "object", + "properties": { + "dataset": { + "type": "string" + }, + "type": { + "type": "string" + }, + "namespace": { + "type": "string" + } + } + } + } + }, + "agent": { "title": "Agent", "description": "An Elastic Agent that has enrolled into Fleet",