diff --git a/changelog/fragments/1701340465-report-output-health.yaml b/changelog/fragments/1701340465-report-output-health.yaml new file mode 100644 index 000000000..d172a502e --- /dev/null +++ b/changelog/fragments/1701340465-report-output-health.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Report output health state to logs-fleet_server.output_health-default data stream + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/owner/repo/3127 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/owner/repo/3116 diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 703117bca..2393f73f0 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -16,6 +16,7 @@ const ( FleetPolicies = ".fleet-policies" FleetPoliciesLeader = ".fleet-policies-leader" FleetServers = ".fleet-servers" + FleetOutputHealth = "logs-fleet_server.output_health-default" ) // Query fields diff --git a/internal/pkg/dl/output_health.go b/internal/pkg/dl/output_health.go new file mode 100644 index 000000000..2282fb699 --- /dev/null +++ b/internal/pkg/dl/output_health.go @@ -0,0 +1,41 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package dl + +import ( + "context" + "encoding/json" + "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/gofrs/uuid" +) + +func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc model.OutputHealth) error { + return createOutputHealth(ctx, bulker, FleetOutputHealth, doc) +} + +func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc model.OutputHealth) error { + if doc.Timestamp == "" { + doc.Timestamp = time.Now().UTC().Format(time.RFC3339) + } + doc.DataStream = &model.DataStream{ + Dataset: "fleet_server.output_health", + Type: "logs", + Namespace: "default", + } + body, err := json.Marshal(doc) + if err != nil { + return err + } + + id, err := uuid.NewV4() + if err != nil { + return err + } + _, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh()) + return err +} diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index ecb71f4aa..b8be8577c 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -305,6 +305,13 @@ type CheckinPolicyInputItems struct { TemplateID string `json:"template_id"` } +// DataStream +type DataStream struct { + Dataset string `json:"dataset,omitempty"` + Namespace string `json:"namespace,omitempty"` + Type string `json:"type,omitempty"` +} + // EnrollmentAPIKey An Elastic Agent enrollment API key type EnrollmentAPIKey struct { ESDocument @@ -342,6 +349,24 @@ type HostMetadata struct { Name string `json:"name"` } +// OutputHealth Output health represents a health state of an output +type OutputHealth struct { + ESDocument + DataStream *DataStream `json:"data_stream,omitempty"` + + // Long state message if unhealthy + Message string `json:"message,omitempty"` + + // Output ID + Output string `json:"output,omitempty"` + + // Health state, can be HEALTHY or DEGRADED + State string `json:"state,omitempty"` + + // Timestamp of reported state + Timestamp string `json:"@timestamp,omitempty"` +} + // Policy A policy that an Elastic Agent is attached to type Policy struct { ESDocument diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index dfc51c21f..c26186dc5 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -15,6 +15,7 @@ import ( "github.com/rs/zerolog" "go.elastic.co/apm/v2" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" @@ -260,10 +261,20 @@ func (p *Output) prepareElasticsearch( ctx := zlog.WithContext(ctx) outputAPIKey, err := generateOutputAPIKey(ctx, outputBulker, agent.Id, p.Name, p.Role.Raw) - // reporting output error status to self monitor and not returning the error to keep fleet-server running + + // reporting output health and not returning the error to keep fleet-server running if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch { if err != nil { - zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES") + doc := model.OutputHealth{ + Output: p.Name, + State: client.UnitStateDegraded.String(), + Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err), + } + zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message) + + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { + zlog.Error().Err(err).Str("outputName", p.Name).Msg("error writing output health") + } } // replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch @@ -271,6 +282,15 @@ func (p *Output) prepareElasticsearch( // remove the service token from the agent policy sent to the agent delete(outputMap[p.Name], FieldOutputServiceToken) return nil + } else if p.Type == OutputTypeRemoteElasticsearch { + doc := model.OutputHealth{ + Output: p.Name, + State: client.UnitStateHealthy.String(), + Message: "", + } + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { + zlog.Error().Err(err).Msg("create output health") + } } if err != nil { return fmt.Errorf("failed generate output API key: %w", err) @@ -304,7 +324,7 @@ func (p *Output) prepareElasticsearch( // Using painless script to append the old keys to the history body, err := renderUpdatePainlessScript(p.Name, fields) if err != nil { - return fmt.Errorf("could no tupdate painless script: %w", err) + return fmt.Errorf("could not update painless script: %w", err) } if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d6480460b..8f167b4b8 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -8,13 +8,17 @@ package policy import ( "context" + "encoding/json" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" @@ -462,6 +466,14 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) { mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(&apiKey, nil).Once() bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once() + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc model.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "" && doc.State == client.UnitStateHealthy.String() + }), mock.Anything).Return("", nil) output := Output{ Type: OutputTypeRemoteElasticsearch, @@ -500,4 +512,48 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) { bulker.AssertExpectations(t) }) + + t.Run("Report degraded output health on API key create failure", func(t *testing.T) { + logger := testlog.SetLogger(t) + bulker := ftesting.NewMockBulk() + var apiKey *bulk.APIKey = nil + var err error = errors.New("error connecting") + + outputBulker := ftesting.NewMockBulk() + outputBulker.On("APIKeyCreate", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(apiKey, err).Once() + bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once() + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc model.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "remote ES could not create API key due to error: error connecting" && doc.State == client.UnitStateDegraded.String() + }), mock.Anything).Return("", nil) + + output := Output{ + Type: OutputTypeRemoteElasticsearch, + Name: "test output", + Role: &RoleT{ + Sha2: "new-hash", + Raw: TestPayload, + }, + } + + policyMap := map[string]map[string]interface{}{ + "test output": map[string]interface{}{ + "hosts": []interface{}{"http://localhost"}, + "service_token": "serviceToken1", + "type": OutputTypeRemoteElasticsearch, + }, + } + testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}} + + err = output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) + require.NoError(t, err, "expected prepare to pass") + + bulker.AssertExpectations(t) + }) } diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index b9c8d7bd0..4803cf165 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -109,10 +109,7 @@ LOOP: return err } cT.Reset(m.checkTime) - if state == client.UnitStateHealthy { - // running; can stop - break LOOP - } + m.log.Trace().Msg(state.String()) case hits := <-s.Output(): policies := make([]model.Policy, len(hits)) for i, hit := range hits { @@ -125,10 +122,7 @@ LOOP: if err != nil { return err } - if state == client.UnitStateHealthy { - // running; can stop - break LOOP - } + m.log.Trace().Msg(state.String()) } } @@ -218,6 +212,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return client.UnitStateStarting, nil } + reportOutputHealth(ctx, m.bulker, m.log) + state := client.UnitStateHealthy extendMsg := "" var payload map[string]interface{} @@ -253,6 +249,32 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return state, nil } +func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) { + //pinging logic + bulkerMap := bulker.GetBulkerMap() + for outputName, outputBulker := range bulkerMap { + doc := model.OutputHealth{ + Output: outputName, + State: client.UnitStateHealthy.String(), + Message: "", + } + res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) + if err != nil { + doc.State = client.UnitStateDegraded.String() + doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error()) + logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) + + } else if res.StatusCode != 200 { + doc.State = client.UnitStateDegraded.String() + doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode) + logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) + } + if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { + logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health") + } + } +} + func HasFleetServerInput(inputs []map[string]interface{}) bool { for _, input := range inputs { attr, ok := input["type"].(string) diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index dd1f4a865..6baf67e56 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -9,7 +9,9 @@ package policy import ( "context" "encoding/json" + "errors" "fmt" + "net/http" "sync" "testing" "time" @@ -26,6 +28,8 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" mmock "github.com/elastic/fleet-server/v7/internal/pkg/monitor/mock" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil" + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func TestSelfMonitor_DefaultPolicy(t *testing.T) { @@ -642,3 +646,59 @@ func (r *FakeReporter) Current() (client.UnitState, string, map[string]interface defer r.lock.Unlock() return r.state, r.msg, r.payload } + +func TestSelfMonitor_reportOutputHealthyState(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := testlog.SetLogger(t) + + bulker := ftesting.NewMockBulk() + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + mockEsClient, _ := esutil.MockESClient(t) + outputBulker.On("Client").Return(mockEsClient) + bulkerMap["remote"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc model.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "" && doc.State == client.UnitStateHealthy.String() + }), mock.Anything).Return("", nil) + + reportOutputHealth(ctx, bulker, logger) + + bulker.AssertExpectations(t) + outputBulker.AssertExpectations(t) +} + +func TestSelfMonitor_reportOutputDegradedState(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + logger := testlog.SetLogger(t) + + bulker := ftesting.NewMockBulk() + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + mockEsClient, mockTransport := esutil.MockESClient(t) + var err error = errors.New("error connecting") + mockTransport.RoundTripFn = func(req *http.Request) (*http.Response, error) { return mockTransport.Response, err } + outputBulker.On("Client").Return(mockEsClient) + bulkerMap["remote"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool { + var doc model.OutputHealth + err := json.Unmarshal(body, &doc) + if err != nil { + t.Fatal(err) + } + return doc.Message == "remote ES is not reachable due to error: error connecting" && doc.State == client.UnitStateDegraded.String() + }), mock.Anything).Return("", nil) + + reportOutputHealth(ctx, bulker, logger) + + bulker.AssertExpectations(t) + outputBulker.AssertExpectations(t) +} diff --git a/model/schema.json b/model/schema.json index 9857da512..75699d1aa 100644 --- a/model/schema.json +++ b/model/schema.json @@ -430,6 +430,43 @@ ] }, + "output_health": { + "description": "Output health represents a health state of an output", + "type": "object", + "properties": { + "output": { + "type": "string", + "description": "Output ID" + }, + "state": { + "type": "string", + "description": "Health state, can be HEALTHY or DEGRADED" + }, + "message": { + "type": "string", + "description": "Long state message if unhealthy" + }, + "@timestamp": { + "type": "string", + "description": "Timestamp of reported state" + }, + "data_stream": { + "type": "object", + "properties": { + "dataset": { + "type": "string" + }, + "type": { + "type": "string" + }, + "namespace": { + "type": "string" + } + } + } + } + }, + "agent": { "title": "Agent", "description": "An Elastic Agent that has enrolled into Fleet",