Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Double testing on failure / state transitions #36120

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestStatesESLoader(t *testing.T) {

monID := etc.createTestMonitorStateInES(t, testStatus)
// Since we've continued this state it should register the initial state
ms := etc.tracker.getCurrentState(monID)
ms := etc.tracker.GetCurrentState(monID)
require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off")
requireMSStatusCount(t, ms, testStatus, 1)

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
t.mtx.Lock()
defer t.mtx.Unlock()

state := t.getCurrentState(sf)
state := t.GetCurrentState(sf)
if state == nil {
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
Expand All @@ -74,7 +74,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
return state.copy()
}

func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State) {
func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) {
if state, ok := t.states[sf.ID]; ok {
return state
}
Expand Down
102 changes: 76 additions & 26 deletions heartbeat/monitors/wrappers/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -65,7 +66,7 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst
logMonitorRun(nil),
),
func() jobs.JobWrapper {
return makeAddSummary()
return addLightweightSummary()
},
func() jobs.JobWrapper {
return addMonitorState(stdMonFields, mst)
Expand All @@ -91,9 +92,14 @@ func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst *mo
)
}

const maxAttempts = 2

// addMonitorState computes the various state fields
func addMonitorState(sf stdfields.StdMonitorFields, mst *monitorstate.Tracker) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
attempt := atomic.Int32{}
attempt.Add(1) // set to 1 at start of job

return func(event *beat.Event) ([]jobs.Job, error) {
cont, err := job(event)

Expand All @@ -106,10 +112,53 @@ func addMonitorState(sf stdfields.StdMonitorFields, mst *monitorstate.Tracker) j
if err != nil {
return nil, fmt.Errorf("could not wrap state for '%s', no status assigned: %w", sf.ID, err)
}
statusStr := monitorstate.StateStatus(status.(string))
if statusStr == "" {
return nil, fmt.Errorf("could not convert status (%v) to monitorstate.StatusStauts", statusStr)
}

curState := mst.GetCurrentState(sf)
stateTransition := curState != nil && curState.Status != statusStr
hasAttemptsRemaining := attempt.Load() < maxAttempts

// only execute a retry if we are going from down -> up AND more retries are available
if stateTransition && hasAttemptsRemaining {
// we are at a state transition, retry if this is not already a retry
// move the summary fields
summary, err := event.GetValue("summary")
if err != nil {
return nil, fmt.Errorf("could not retrieve summary in addMonitorState: %w", err)
}
err = event.Delete("summary")
if err != nil {
return nil, fmt.Errorf("could not delete summary in addMonitorState: %w", err)
}

_, err = event.PutValue("attempt.summary", summary)
if err != nil {
return nil, fmt.Errorf("could not put attempt.summary in addMonitorState: %w", err)
}

attempt.Add(1)

eventext.MergeEventFields(event, mapstr.M{
"attempt": mapstr.M{
"summary": summary,
"attempt": int(attempt.Load()),
"max_attempts": maxAttempts,
},
})

// Restart the job, short circuit all else
return []jobs.Job{job}, nil
} else {
// reset to 1 for next job root invocation
attempt.Store(1)
}

ms := mst.RecordStatus(sf, monitorstate.StateStatus(status.(string)))

eventext.MergeEventFields(event, mapstr.M{"state": ms})
eventext.MergeEventFields(event, mapstr.M{"state": ms, "attempt": mapstr.M{"attempt": int(attempt.Load()), "max_attempts": maxAttempts}})

return cont, nil
}
Expand Down Expand Up @@ -279,16 +328,15 @@ func logMonitorRun(match EventMatcher) jobs.JobWrapper {
}
}

// makeAddSummary summarizes the job, adding the `summary` field to the last event emitted.
func makeAddSummary() jobs.JobWrapper {
// addLightweightSummary summarizes the job, adding the `summary` field to the last event emitted.
func addLightweightSummary() jobs.JobWrapper {
// This is a tricky method. The way this works is that we track the state across jobs in the
// state struct here.
state := struct {
mtx sync.Mutex
monitorId string
remaining uint16
up uint16
down uint16
remaining int
up int
down int
checkGroup string
generation uint64
}{
Expand All @@ -300,6 +348,7 @@ func makeAddSummary() jobs.JobWrapper {
state.up = 0
state.down = 0
state.generation++

u, err := uuid.NewV1()
if err != nil {
panic(fmt.Sprintf("cannot generate UUIDs on this system: %s", err))
Expand Down Expand Up @@ -330,21 +379,19 @@ func makeAddSummary() jobs.JobWrapper {
_, _ = event.PutValue("monitor.check_group", state.checkGroup)

// Adjust the total remaining to account for new continuations
state.remaining += uint16(len(cont))
state.remaining += len(cont)
// Reduce total remaining to account for the just executed job
state.remaining--

// After last job
if state.remaining == 0 {
up := state.up
down := state.down
if len(cont) != 0 {
// this should __never__ happen, but in the off chance it does we could track it with some work
logp.L().Errorf("heartbeat wrapper invariant violation, > 0 continuations with 0 remaining jobs, cg:%s", state.checkGroup)
}

addSummaryFields(event, state.up, state.down)

eventext.MergeEventFields(event, mapstr.M{
"summary": mapstr.M{
"up": up,
"down": down,
},
})
resetState()
}

Expand All @@ -353,6 +400,15 @@ func makeAddSummary() jobs.JobWrapper {
}
}

func addSummaryFields(event *beat.Event, up int, down int) {
eventext.MergeEventFields(event, mapstr.M{
"summary": mapstr.M{
"up": up,
"down": down,
},
})
}

type EventMatcher func(event *beat.Event) bool

func addBrowserSummary(sf stdfields.StdMonitorFields, match EventMatcher) jobs.JobWrapper {
Expand All @@ -369,18 +425,12 @@ func addBrowserSummary(sf stdfields.StdMonitorFields, match EventMatcher) jobs.J
return nil, fmt.Errorf("could not wrap summary for '%s', no status assigned: %w", sf.ID, err)
}

up, down := 1, 0
if monitorstate.StateStatus(status.(string)) == monitorstate.StatusDown {
up, down = 0, 1
addSummaryFields(event, 0, 1)
} else {
addSummaryFields(event, 1, 0)
}

eventext.MergeEventFields(event, mapstr.M{
"summary": mapstr.M{
"up": up,
"down": down,
},
})

return cont, jobErr
}
}
Expand Down
57 changes: 30 additions & 27 deletions heartbeat/monitors/wrappers/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,36 +71,35 @@ var testBrowserMonFields = stdfields.StdMonitorFields{
}

func testCommonWrap(t *testing.T, tt testDef) {
t.Run(tt.name, func(t *testing.T) {
wrapped := WrapCommon(tt.jobs, tt.sFields, nil)
t.Helper()
wrapped := WrapCommon(tt.jobs, tt.sFields, nil)

core, observedLogs := observer.New(zapcore.InfoLevel)
logger.SetLogger(logp.NewLogger("t", zap.WrapCore(func(in zapcore.Core) zapcore.Core {
return zapcore.NewTee(in, core)
})))
core, observedLogs := observer.New(zapcore.InfoLevel)
logger.SetLogger(logp.NewLogger("t", zap.WrapCore(func(in zapcore.Core) zapcore.Core {
return zapcore.NewTee(in, core)
})))

results, err := jobs.ExecJobsAndConts(t, wrapped)
assert.NoError(t, err)
resultEvents, err := jobs.ExecJobsAndConts(t, wrapped)
assert.NoError(t, err)

require.Equal(t, len(results), len(tt.want), "Expected test def wants to correspond exactly to number results.")
for idx, r := range results {
t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) {
require.Equal(t, len(tt.want), len(resultEvents), "Expected test def had a different number of result events")
for idx, r := range resultEvents {
t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) {

want := tt.want[idx]
testslike.Test(t, lookslike.Strict(want), r.Fields)
want := tt.want[idx]
testslike.Test(t, lookslike.Strict(want), r.Fields)

if tt.metaWant != nil {
metaWant := tt.metaWant[idx]
testslike.Test(t, lookslike.Strict(metaWant), r.Meta)
}
if tt.metaWant != nil {
metaWant := tt.metaWant[idx]
testslike.Test(t, lookslike.Strict(metaWant), r.Meta)
}

})
}
})
}

if tt.logValidator != nil {
tt.logValidator(t, results, observedLogs.All())
}
})
if tt.logValidator != nil {
tt.logValidator(t, resultEvents, observedLogs.All())
}
}

func TestSimpleJob(t *testing.T) {
Expand Down Expand Up @@ -433,9 +432,13 @@ func stateValidator() validator.Validator {
func summaryValidator(up int, down int) validator.Validator {
return lookslike.MustCompile(map[string]interface{}{
"summary": map[string]interface{}{
"up": uint16(up),
"down": uint16(down),
"up": up,
"down": down,
},
"attempt": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think better name would be retry and noOfRetries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_retries

"attempt": 1,
"max_attempts": maxAttempts,
}
})
}

Expand Down Expand Up @@ -653,9 +656,9 @@ func TestProjectBrowserJob(t *testing.T) {
lookslike.Compose(
urlValidator(t, urlStr),
expectedMonFields,
summaryValidator(1, 0),
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{"status": "up"},
"summary": map[string]interface{}{"up": 1, "down": 0},
"event": map[string]interface{}{
"type": "heartbeat/summary",
},
Expand All @@ -673,9 +676,9 @@ func TestProjectBrowserJob(t *testing.T) {
lookslike.Compose(
urlValidator(t, urlStr),
expectedMonFields,
summaryValidator(0, 1),
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{"status": "down"},
"summary": map[string]interface{}{"up": 0, "down": 1},
"error": map[string]interface{}{
"type": isdef.IsString,
"message": "testerr",
Expand Down
12 changes: 6 additions & 6 deletions x-pack/heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ heartbeat.config.monitors:
heartbeat.monitors:
- type: http
# Set enabled to true (or delete the following line) to enable this monitor
enabled: false
enabled: true
# ID used to uniquely identify this monitor in Elasticsearch even if the config changes
id: my-monitor
# Human readable display name for this service in Uptime UI and elsewhere
name: My Monitor
# List of URLs to query
urls: ["http://localhost:9200"]
urls: ["http://localhost:5678/pattern?r=200x1,500x1,200x1,500x1,500x1,200x1,200x1"]
# Configure task schedule
schedule: '@every 10s'
schedule: '@every 1s'
# Total test connection and data exchange timeout
#timeout: 16s
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
Expand Down Expand Up @@ -96,11 +96,11 @@ setup.kibana:
# ================================== Outputs ===================================

# Configure what output to use when sending the data collected by the beat.

output.console: ~
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
#output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
#hosts: ["localhost:9200"]

# Protocol - either `http` (default) or `https`.
#protocol: "https"
Expand Down