Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.5] Log monitoring bulk failures (#14356) #14527

Merged
merged 7 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Fix checking tagsFilter using length in cloudwatch metricset. {pull}14525[14525]
- Fixed bug with `elasticsearch/cluster_stats` metricset not recording license expiration date correctly. {issue}14541[14541] {pull}14591[14591]
- Convert indexed ms-since-epoch timestamp fields in `elasticsearch/ml_job` metricset to ints from float64s. {issue}14220[14220] {pull}14222[14222]
- Fix ARN parsing function to work for ELB ARNs. {pull}14316[14316]
- Update azure configuration example. {issue}14224[14224]
- Limit some of the error messages to the logs only {issue}14317[14317] {pull}14327[14327]
- Fix cloudwatch metricset with names and dimensions in config. {issue}14376[14376] {pull}14391[14391]
- Fix marshaling of ms-since-epoch values in `elasticsearch/cluster_stats` metricset. {pull}14378[14378]
- Log bulk failures from bulk API requests to monitoring cluster. {issue}14303[14303] {pull}14356[14356]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebase issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, yes, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ycombinator still an issue ? one day 🤖 will take care of it.


*Packetbeat*

Expand Down
7 changes: 2 additions & 5 deletions docs/devguide/newbeat.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,10 @@ The following topics describe how to build a new Beat:

All Beats are written in http://golang.org/[Go], so having Go installed and knowing
the basics are prerequisites for understanding this guide.
But don't worry if you aren't a Go expert. Go is a relatively new
language, and very few people are experts in it. In fact, several
people learned Go by contributing to Packetbeat and libbeat, including the
original Packetbeat authors.

*Before you begin:* Set up your Go environment as described under
<<setting-up-dev-environment>> in <<beats-contributing>>.
<<setting-up-dev-environment>> in <<beats-contributing>>. The minimum required
Go version is {go-version}.

To build your Beat on a specific version of libbeat, check out the specific
branch ({branch} in the example below):
Expand Down
2 changes: 0 additions & 2 deletions filebeat/docs/configuring-howto.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ The {beatname_uc} configuration file uses http://yaml.org/[YAML] for its syntax.
See the {beats-ref}/config-file-format.html[Config File Format] section of the
_Beats Platform Reference_ for more about the structure of the config file.

include::../../libbeat/docs/shared-cm-tip.asciidoc[]

The following topics describe how to configure Filebeat:

* <<configuration-filebeat-modules>>
Expand Down
2 changes: 0 additions & 2 deletions filebeat/docs/getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ include::{libbeat-dir}/docs/step-test-config.asciidoc[]

include::{libbeat-dir}/docs/step-look-at-config.asciidoc[]

include::../../libbeat/docs/shared-cm-tip.asciidoc[]

[[filebeat-template]]
=== Step 3: Load the index template in Elasticsearch

Expand Down
6 changes: 3 additions & 3 deletions libbeat/docs/shared-central-management.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[partintro]
--

beta[]
include::{asciidoc-dir}/../../shared/discontinued.asciidoc[tag=cm-discontinued]

[WARNING]
=======================================
Expand Down Expand Up @@ -38,7 +38,7 @@ include::shared-license-statement.asciidoc[]
[role="xpack"]
== How central management works

beta[]
include::{asciidoc-dir}/../../shared/discontinued.asciidoc[tag=cm-discontinued]

{beats} central management uses a mechanism called configuration tags to group
related configurations. You define configuration tags in the {cm-ui} UI in {kib}
Expand Down Expand Up @@ -105,7 +105,7 @@ the Beat to troubleshoot the problem.
[role="xpack"]
== Enroll {beats} in central management

beta[]
include::{asciidoc-dir}/../../shared/discontinued.asciidoc[tag=cm-discontinued]

You need to enroll {beats} to register them in
<<configuration-central-management,central management>> and establish
Expand Down
3 changes: 0 additions & 3 deletions libbeat/docs/shared-cm-tip.asciidoc

This file was deleted.

31 changes: 30 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -229,7 +230,12 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
// FIXME: index name (first param below)
_, err = c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
result, err := c.es.BulkWith(getMonitoringIndexName(), "", nil, nil, bulk[:])
if err != nil {
return err
}

logBulkFailures(result, []report.Event{document})
return err
}

Expand All @@ -238,3 +244,26 @@ func getMonitoringIndexName() string {
date := time.Now().Format("2006.01.02")
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
29 changes: 10 additions & 19 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticsearch

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"net/http"
Expand All @@ -34,16 +35,15 @@ type bulkRequest struct {
requ *http.Request
}

type bulkResult struct {
raw []byte
}
// BulkResult contains the result of a bulk API request.
type BulkResult json.RawMessage

// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
index, docType string,
params map[string]string, body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
return conn.BulkWith(index, docType, params, nil, body)
}

Expand All @@ -56,7 +56,7 @@ func (conn *Connection) BulkWith(
params map[string]string,
metaBuilder MetaBuilder,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -76,7 +76,7 @@ func (conn *Connection) BulkWith(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
Expand All @@ -85,7 +85,7 @@ func (conn *Connection) BulkWith(
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}
Expand All @@ -111,7 +111,7 @@ func (conn *Connection) SendMonitoringBulk(
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
return result, nil
}

func newBulkRequest(
Expand Down Expand Up @@ -199,18 +199,9 @@ func (r *bulkRequest) Reset(body bodyEncoder) {
body.AddHeader(&r.requ.Header)
}

func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, bulkResult, error) {
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) {
status, resp, err := conn.execHTTPRequest(requ.requ)
if err != nil {
return status, bulkResult{}, err
}

result, err := readBulkResult(resp)
return status, result, err
}

func readBulkResult(obj []byte) (bulkResult, error) {
return bulkResult{obj}, nil
return status, BulkResult(resp), err
}

func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error {
Expand Down
8 changes: 2 additions & 6 deletions libbeat/outputs/elasticsearch/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"
"os"
Expand All @@ -34,7 +33,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
expectedResp, _ := json.Marshal(QueryResult{Ok: true, Index: index, Type: "type1", ID: "1", Version: 1, Created: true})
expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`)

ops := []map[string]interface{}{
{
Expand All @@ -61,13 +60,10 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
resp, err := client.Bulk(index, "type1", params, body)
_, err := client.Bulk(index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Bulk() fails: %s", resp)
}
}

func TestOneHost500Resp_Bulk(t *testing.T) {
Expand Down
82 changes: 45 additions & 37 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Client struct {
bulkRequ *bulkRequest

// buffered json response reader
json jsonReader
json JSONReader

// additional configs
compressionLevel int
Expand Down Expand Up @@ -125,6 +125,7 @@ var (
)

var (
errExpectedItemsArray = errors.New("expected items array")
errExpectedItemObject = errors.New("expected item response object")
errExpectedStatusCode = errors.New("expected item status code")
errUnexpectedEmptyObject = errors.New("empty object")
Expand Down Expand Up @@ -355,7 +356,7 @@ func (client *Client) publishEvents(
failedEvents = data
stats.fails = len(failedEvents)
} else {
client.json.init(result.raw)
client.json.init(result)
failedEvents, stats = bulkCollectPublishFails(&client.json, data)
}

Expand Down Expand Up @@ -473,46 +474,19 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
reader *jsonReader,
reader *JSONReader,
data []publisher.Event,
) ([]publisher.Event, bulkResultStats) {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk response: expected JSON object")
return nil, bulkResultStats{}
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil, bulkResultStats{}
}

if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil, bulkResultStats{}
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk response: expected items array")
if err := BulkReadToItems(reader); err != nil {
logp.Err("failed to parse bulk response: %v", err.Error())
return nil, bulkResultStats{}
}

count := len(data)
failed := data[:0]
stats := bulkResultStats{}
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
status, msg, err := BulkReadItemStatus(reader)
if err != nil {
return nil, bulkResultStats{}
}
Expand Down Expand Up @@ -548,9 +522,43 @@ func bulkCollectPublishFails(
return failed, stats
}

func itemStatus(reader *jsonReader) (int, []byte, error) {
// BulkReadToItems reads the bulk response up to (but not including) items
func BulkReadToItems(reader *JSONReader) error {
if err := reader.ExpectDict(); err != nil {
return errExpectedObject
}

// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
return err
}

if kind == dictEnd {
return errExpectedItemsArray
}

// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}

reader.ignoreNext()
}

// check items field is an array
if err := reader.ExpectArray(); err != nil {
return errExpectedItemsArray
}

return nil
}

// BulkReadItemStatus reads the status and error fields from the bulk item
func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) {
// skip outer dictionary
if err := reader.expectDict(); err != nil {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down Expand Up @@ -588,8 +596,8 @@ func itemStatus(reader *jsonReader) (int, []byte, error) {
return status, msg, nil
}

func itemStatusInner(reader *jsonReader) (int, []byte, error) {
if err := reader.expectDict(); err != nil {
func itemStatusInner(reader *JSONReader) (int, []byte, error) {
if err := reader.ExpectDict(); err != nil {
return 0, nil, errExpectedItemObject
}

Expand Down
Loading