From f77b97c875664beb267dd8695aba5b150da12679 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 21 Sep 2022 16:56:09 -0400 Subject: [PATCH] Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896) * Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. * Add changelog entry. * Update CHANGELOG.next.asciidoc Co-authored-by: Craig MacKenzie Co-authored-by: Craig MacKenzie --- CHANGELOG.next.asciidoc | 1 + internal/pkg/api/handleAck.go | 4 ++-- internal/pkg/bulk/opBulk.go | 5 +++++ internal/pkg/coordinator/monitor.go | 2 +- internal/pkg/dl/servers.go | 2 +- internal/pkg/policy/policy_output.go | 4 ++-- 6 files changed, 12 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1022ebe60..e5858caff 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -10,6 +10,7 @@ - Remove events from agent checkin body. {issue}1774[1774] - Improve authc debug logging. {pull}1870[1870] - Add error detail to catch-all HTTP error response. {pull}1854[1854] +- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896] ==== New Features diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index c69a65b2c..c73808694 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -497,7 +497,7 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent return errors.Wrap(err, "handleUnenroll marshal") } - if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { return errors.Wrap(err, "handleUnenroll update") } @@ -519,7 +519,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return errors.Wrap(err, "handleUpgrade marshal") } - if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { return errors.Wrap(err, "handleUpgrade update") } diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index d47ba9592..7ecb7c8a6 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -14,6 +14,8 @@ import ( "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/mailru/easyjson" "github.com/rs/zerolog/log" + + "github.com/elastic/fleet-server/v7/internal/pkg/es" ) func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) { @@ -74,6 +76,9 @@ func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id s if !ok { return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data) } + if err := es.TranslateError(r.Status, r.Error); err != nil { + return nil, err + } return r, nil } diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 2242305f8..f203ab45a 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -562,7 +562,7 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return err } } - if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("Fail unenrollAgent record update") } diff --git a/internal/pkg/dl/servers.go b/internal/pkg/dl/servers.go index 409654e60..2686bf560 100644 --- a/internal/pkg/dl/servers.go +++ b/internal/pkg/dl/servers.go @@ -55,5 +55,5 @@ func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent m if err != nil { return err } - return bulker.Update(ctx, o.indexName, agent.ID, data) + return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) } diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 55683cf51..4f5b99ae5 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -157,7 +157,7 @@ func (p *Output) prepareElasticsearch( return err } - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return err } @@ -206,7 +206,7 @@ func (p *Output) prepareElasticsearch( return fmt.Errorf("could no tupdate painless script: %w", err) } - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("fail update agent record") return fmt.Errorf("fail update agent record: %w", err) }