Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

count 429s in elasticsearch output #8056

Merged
merged 2 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Count HTTP 429 responses in the elasticsearch output {pull}8056[8056]

*Auditbeat*

Expand Down
16 changes: 11 additions & 5 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type bulkResultStats struct {
duplicates int // number of events failed with `create` due to ID already being indexed
fails int // number of failed events (can be retried)
nonIndexable int // number of failed events (not indexable -> must be dropped)
tooMany int // number of events receiving HTTP 429 Too Many Requests
}

var (
Expand Down Expand Up @@ -343,6 +344,7 @@ func (client *Client) publishEvents(
st.Failed(failed)
st.Dropped(dropped)
st.Duplicate(duplicates)
st.ErrTooMany(stats.tooMany)
}

if failed > 0 {
Expand Down Expand Up @@ -515,11 +517,15 @@ func bulkCollectPublishFails(
continue // ok
}

if status < 500 && status != 429 {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
if status < 500 {
if status == http.StatusTooManyRequests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glad I am not the only one that use these constants :)

stats.tooMany++
} else {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
}
}

debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ func TestCollectPublishFailMiddle(t *testing.T) {
events := []publisher.Event{event, eventFail, event}

reader := newJSONReader(response)
res, _ := bulkCollectPublishFails(reader, events)
res, stats := bulkCollectPublishFails(reader, events)
assert.Equal(t, 1, len(res))
if len(res) == 1 {
assert.Equal(t, eventFail, res[0])
}
assert.Equal(t, stats, bulkResultStats{acked: 2, fails: 1, tooMany: 1})
}

func TestCollectPublishFailAll(t *testing.T) {
Expand All @@ -139,9 +140,10 @@ func TestCollectPublishFailAll(t *testing.T) {
events := []publisher.Event{event, event, event}

reader := newJSONReader(response)
res, _ := bulkCollectPublishFails(reader, events)
res, stats := bulkCollectPublishFails(reader, events)
assert.Equal(t, 3, len(res))
assert.Equal(t, events, res)
assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3})
}

func TestCollectPipelinePublishFail(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions libbeat/outputs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Stats struct {
active *monitoring.Uint // events sent and waiting for ACK/fail from output
duplicates *monitoring.Uint // events sent and waiting for ACK/fail from output
dropped *monitoring.Uint // total number of invalid events dropped by the output
tooMany *monitoring.Uint // total number of too many requests replies from output

//
// Output network connection stats
Expand All @@ -56,6 +57,7 @@ func NewStats(reg *monitoring.Registry) *Stats {
dropped: monitoring.NewUint(reg, "events.dropped"),
duplicates: monitoring.NewUint(reg, "events.duplicates"),
active: monitoring.NewUint(reg, "events.active"),
tooMany: monitoring.NewUint(reg, "events.toomany"),

writeBytes: monitoring.NewUint(reg, "write.bytes"),
writeErrors: monitoring.NewUint(reg, "write.errors"),
Expand Down Expand Up @@ -117,6 +119,13 @@ func (s *Stats) Cancelled(n int) {
}
}

// ErrTooMany updates the number of Too Many Requests responses reported by the output.
func (s *Stats) ErrTooMany(n int) {
if s != nil {
s.tooMany.Add(uint64(n))
}
}

// WriteError increases the write I/O error metrics.
func (s *Stats) WriteError(err error) {
if s != nil {
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Observer interface {
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
}

type emptyObserver struct{}
Expand All @@ -51,3 +52,4 @@ func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}