-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
report output health #3127
report output health #3127
Changes from all commits
fb6a3b4
928aac5
e9c22f9
8a672a9
7940d1a
eea71f3
41b70bc
5082970
0b45780
3d1888a
588c9d1
9644f30
fae7af6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Kind can be one of: | ||
# - breaking-change: a change to previously-documented behavior | ||
# - deprecation: functionality that is being removed in a later release | ||
# - bug-fix: fixes a problem in a previous version | ||
# - enhancement: extends functionality but does not break or fix existing behavior | ||
# - feature: new functionality | ||
# - known-issue: problems that we are aware of in a given version | ||
# - security: impacts on the security of a product or a user’s deployment. | ||
# - upgrade: important information for someone upgrading from a prior version | ||
# - other: does not fit into any of the other categories | ||
kind: feature | ||
|
||
# Change summary; a 80ish characters long description of the change. | ||
summary: Report output health state to logs-fleet_server.output_health-default data stream | ||
|
||
# Long description; in case the summary is not enough to describe the change | ||
# this field accommodate a description without length limits. | ||
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. | ||
#description: | ||
|
||
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. | ||
component: | ||
|
||
# PR URL; optional; the PR number that added the changeset. | ||
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. | ||
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. | ||
# Please provide it if you are adding a fragment for a different PR. | ||
pr: https://github.com/owner/repo/3127 | ||
|
||
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). | ||
# If not present is automatically filled by the tooling with the issue linked to the PR number. | ||
issue: https://github.com/owner/repo/3116 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package dl | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"time" | ||
|
||
"github.com/elastic/fleet-server/v7/internal/pkg/bulk" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/model" | ||
"github.com/gofrs/uuid" | ||
) | ||
|
||
func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc model.OutputHealth) error { | ||
return createOutputHealth(ctx, bulker, FleetOutputHealth, doc) | ||
} | ||
|
||
func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc model.OutputHealth) error { | ||
if doc.Timestamp == "" { | ||
doc.Timestamp = time.Now().UTC().Format(time.RFC3339) | ||
} | ||
doc.DataStream = &model.DataStream{ | ||
Dataset: "fleet_server.output_health", | ||
Type: "logs", | ||
Namespace: "default", | ||
} | ||
body, err := json.Marshal(doc) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
id, err := uuid.NewV4() | ||
if err != nil { | ||
return err | ||
} | ||
_, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh()) | ||
return err | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,10 +109,7 @@ LOOP: | |
return err | ||
} | ||
cT.Reset(m.checkTime) | ||
if state == client.UnitStateHealthy { | ||
// running; can stop | ||
break LOOP | ||
} | ||
m.log.Trace().Msg(state.String()) | ||
case hits := <-s.Output(): | ||
policies := make([]model.Policy, len(hits)) | ||
for i, hit := range hits { | ||
|
@@ -125,10 +122,7 @@ LOOP: | |
if err != nil { | ||
return err | ||
} | ||
if state == client.UnitStateHealthy { | ||
// running; can stop | ||
break LOOP | ||
} | ||
m.log.Trace().Msg(state.String()) | ||
} | ||
} | ||
|
||
|
@@ -218,6 +212,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error | |
return client.UnitStateStarting, nil | ||
} | ||
|
||
reportOutputHealth(ctx, m.bulker, m.log) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently pinging remote outputs every 5s (default monitor interval) and writing out a doc to the output health data stream. |
||
|
||
state := client.UnitStateHealthy | ||
extendMsg := "" | ||
var payload map[string]interface{} | ||
|
@@ -253,6 +249,32 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error | |
return state, nil | ||
} | ||
|
||
func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) { | ||
//pinging logic | ||
bulkerMap := bulker.GetBulkerMap() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned on the previous pr, the regular health reporting will stop if fleet-server is restarted, and doesn't restart until an agent tries to create an API key again (e.g. due to change in output config), because the |
||
for outputName, outputBulker := range bulkerMap { | ||
doc := model.OutputHealth{ | ||
Output: outputName, | ||
State: client.UnitStateHealthy.String(), | ||
Message: "", | ||
} | ||
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) | ||
if err != nil { | ||
doc.State = client.UnitStateDegraded.String() | ||
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error()) | ||
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) | ||
|
||
} else if res.StatusCode != 200 { | ||
doc.State = client.UnitStateDegraded.String() | ||
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode) | ||
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message) | ||
} | ||
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil { | ||
logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health") | ||
} | ||
} | ||
} | ||
|
||
func HasFleetServerInput(inputs []map[string]interface{}) bool { | ||
for _, input := range inputs { | ||
attr, ok := input["type"].(string) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be constants? Can
Namespace
ever be something else?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be always default.