From 7e12ba1a9d70541c39be126d16f6fb975ba9c339 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 30 Sep 2022 15:25:48 -0400 Subject: [PATCH] [8.4](backport #1896) Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1941) * Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896) * Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. * Add changelog entry. * Update CHANGELOG.next.asciidoc Co-authored-by: Craig MacKenzie Co-authored-by: Craig MacKenzie (cherry picked from commit f77b97c875664beb267dd8695aba5b150da12679) # Conflicts: # CHANGELOG.next.asciidoc # internal/pkg/policy/policy_output.go * Fix conflict issues. * Update CHANGELOG.next.asciidoc Co-authored-by: Blake Rouse --- CHANGELOG.next.asciidoc | 1 + internal/pkg/api/handleAck.go | 4 ++-- internal/pkg/bulk/opBulk.go | 5 +++++ internal/pkg/coordinator/monitor.go | 2 +- internal/pkg/dl/servers.go | 2 +- 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c65b70b5a..359dede60 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -8,6 +8,7 @@ - Add error detail to catch-all HTTP error response. {pull}1854[1854] - Update apikey.cache_hit log field name to match convention. {pull}1900[1900] - LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912] +- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896] ==== New Features diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 3f284b5da..4b9831894 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -407,7 +407,7 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent return errors.Wrap(err, "handleUnenroll marshal") } - if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { return errors.Wrap(err, "handleUnenroll update") } @@ -428,7 +428,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return errors.Wrap(err, "handleUpgrade marshal") } - if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { return errors.Wrap(err, "handleUpgrade update") } diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 50b2c47e0..e7fc6a85a 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -13,6 +13,8 @@ import ( "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/mailru/easyjson" "github.com/rs/zerolog/log" + + "github.com/elastic/fleet-server/v7/internal/pkg/es" ) func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) { @@ -73,6 +75,9 @@ func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id s if !ok { return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data) } + if err := es.TranslateError(r.Status, r.Error); err != nil { + return nil, err + } return r, nil } diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 53870e58e..cca266672 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -560,7 +560,7 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return err } } - if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("Fail unenrollAgent record update") } diff --git a/internal/pkg/dl/servers.go b/internal/pkg/dl/servers.go index 409654e60..2686bf560 100644 --- a/internal/pkg/dl/servers.go +++ b/internal/pkg/dl/servers.go @@ -55,5 +55,5 @@ func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent m if err != nil { return err } - return bulker.Update(ctx, o.indexName, agent.ID, data) + return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) }