Skip to content

Commit

Permalink
[8.4](backport #1896) Catch error in waitBulkAction. Add bulk.WithRet…
Browse files Browse the repository at this point in the history
…ryOnConflict(3) in multiple places. (#1941)

* Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)

* Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places.

* Add changelog entry.

* Update CHANGELOG.next.asciidoc

Co-authored-by: Craig MacKenzie <[email protected]>

Co-authored-by: Craig MacKenzie <[email protected]>
(cherry picked from commit f77b97c)

# Conflicts:
#	CHANGELOG.next.asciidoc
#	internal/pkg/policy/policy_output.go

* Fix conflict issues.

* Update CHANGELOG.next.asciidoc

Co-authored-by: Blake Rouse <[email protected]>
  • Loading branch information
mergify[bot] and blakerouse authored Sep 30, 2022
1 parent 6e54a01 commit 7e12ba1
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Add error detail to catch-all HTTP error response. {pull}1854[1854]
- Update apikey.cache_hit log field name to match convention. {pull}1900[1900]
- LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912]
- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896]
==== New Features

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return errors.Wrap(err, "handleUnenroll marshal")
}

if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
return errors.Wrap(err, "handleUnenroll update")
}

Expand All @@ -428,7 +428,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return errors.Wrap(err, "handleUpgrade marshal")
}

if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
return errors.Wrap(err, "handleUpgrade update")
}

Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/mailru/easyjson"
"github.com/rs/zerolog/log"

"github.com/elastic/fleet-server/v7/internal/pkg/es"
)

func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) {
Expand Down Expand Up @@ -73,6 +75,9 @@ func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id s
if !ok {
return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data)
}
if err := es.TranslateError(r.Status, r.Error); err != nil {
return nil, err
}
return r, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
return err
}
}
if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("Fail unenrollAgent record update")
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/dl/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent m
if err != nil {
return err
}
return bulker.Update(ctx, o.indexName, agent.ID, data)
return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3))
}

0 comments on commit 7e12ba1

Please sign in to comment.