Skip to content

Commit

Permalink
Thresholds flush for the json output
Browse files Browse the repository at this point in the history
The JSON Output implements WithThresholds
so the Engine can set the thresholds invoking SetThresholds.

The flush logic sets the Metric's Thresholds
to write them in the json output.

Closes #1052
  • Loading branch information
codebien authored and na-- committed Mar 11, 2021
1 parent 9790995 commit 8ce65c1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
13 changes: 12 additions & 1 deletion output/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/sirupsen/logrus"
)

// TODO: add option for emitting proper JSON files (https://github.com/loadimpact/k6/issues/1052)
// TODO: add option for emitting proper JSON files (https://github.com/loadimpact/k6/issues/737)
const flushPeriod = 200 * time.Millisecond // TODO: make this configurable

// Output funnels all passed metrics to an (optionally gzipped) JSON file.
Expand All @@ -47,6 +47,7 @@ type Output struct {
encoder *stdlibjson.Encoder
closeFn func() error
seenMetrics map[string]struct{}
thresholds map[string][]*stats.Threshold
}

// New returns a new JSON output.
Expand Down Expand Up @@ -118,6 +119,15 @@ func (o *Output) Stop() error {
return o.closeFn()
}

// SetThresholds receives the thresholds before the output is Start()-ed.
func (o *Output) SetThresholds(thresholds map[string]stats.Thresholds) {
ths := make(map[string][]*stats.Threshold)
for name, t := range thresholds {
ths[name] = append(ths[name], t.Thresholds...)
}
o.thresholds = ths
}

func (o *Output) flushMetrics() {
samples := o.GetBufferedSamples()
start := time.Now()
Expand All @@ -127,6 +137,7 @@ func (o *Output) flushMetrics() {
count += len(samples)
for _, sample := range samples {
sample := sample
sample.Metric.Thresholds.Thresholds = o.thresholds[sample.Metric.Name]
o.handleMetric(sample.Metric)
err := o.encoder.Encode(WrapSample(sample))
if err != nil {
Expand Down
21 changes: 19 additions & 2 deletions output/json/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ func generateTestMetricSamples(t *testing.T) ([]stats.SampleContainer, func(io.R
}, Time: time2, Tags: connTags},
stats.Sample{Time: time3, Metric: metric2, Value: float64(5), Tags: stats.NewSampleTags(map[string]string{"tag3": "val3"})},
}
// TODO: fix JSON thresholds (https://github.com/loadimpact/k6/issues/1052)
expected := []string{
`{"type":"Metric","data":{"name":"my_metric1","type":"gauge","contains":"default","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric1"}`,
`{"type":"Metric","data":{"name":"my_metric1","type":"gauge","contains":"default","tainted":null,"thresholds":["rate<0.01", "p(99)<250"],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric1"}`,
`{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":1,"tags":{"tag1":"val1"}},"metric":"my_metric1"}`,
`{"type":"Point","data":{"time":"2021-02-24T13:37:10Z","value":2,"tags":{"tag2":"val2"}},"metric":"my_metric1"}`,
`{"type":"Metric","data":{"name":"my_metric2","type":"counter","contains":"data","tainted":null,"thresholds":[],"submetrics":null,"sub":{"name":"","parent":"","suffix":"","tags":null}},"metric":"my_metric2"}`,
Expand All @@ -94,6 +93,8 @@ func TestJsonOutputStdout(t *testing.T) {
StdOut: stdout,
})
require.NoError(t, err)

setThresholds(t, out)
require.NoError(t, out.Start())

samples, validateResults := generateTestMetricSamples(t)
Expand Down Expand Up @@ -130,6 +131,8 @@ func TestJsonOutputFile(t *testing.T) {
ConfigArgument: "/json-output",
})
require.NoError(t, err)

setThresholds(t, out)
require.NoError(t, out.Start())

samples, validateResults := generateTestMetricSamples(t)
Expand All @@ -156,6 +159,8 @@ func TestJsonOutputFileGzipped(t *testing.T) {
ConfigArgument: "/json-output.gz",
})
require.NoError(t, err)

setThresholds(t, out)
require.NoError(t, out.Start())

samples, validateResults := generateTestMetricSamples(t)
Expand Down Expand Up @@ -185,3 +190,15 @@ func TestWrapMetricWithMetricPointer(t *testing.T) {
out := wrapMetric(&stats.Metric{})
assert.NotEqual(t, out, (*Envelope)(nil))
}

func setThresholds(t *testing.T, out output.Output) {
t.Helper()

jout, ok := out.(*Output)
require.True(t, ok)

ts, err := stats.NewThresholds([]string{"rate<0.01", "p(99)<250"})
require.NoError(t, err)

jout.SetThresholds(map[string]stats.Thresholds{"my_metric1": ts})
}

0 comments on commit 8ce65c1

Please sign in to comment.