Skip to content

Commit

Permalink
Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in mul…
Browse files Browse the repository at this point in the history
…tiple places. (#1896)

* Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places.

* Add changelog entry.

* Update CHANGELOG.next.asciidoc

Co-authored-by: Craig MacKenzie <[email protected]>

Co-authored-by: Craig MacKenzie <[email protected]>
  • Loading branch information
blakerouse and cmacknz authored Sep 21, 2022
1 parent bb7016f commit f77b97c
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Remove events from agent checkin body. {issue}1774[1774]
- Improve authc debug logging. {pull}1870[1870]
- Add error detail to catch-all HTTP error response. {pull}1854[1854]
- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896]

==== New Features

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return errors.Wrap(err, "handleUnenroll marshal")
}

if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
return errors.Wrap(err, "handleUnenroll update")
}

Expand All @@ -519,7 +519,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return errors.Wrap(err, "handleUpgrade marshal")
}

if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
return errors.Wrap(err, "handleUpgrade update")
}

Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/mailru/easyjson"
"github.com/rs/zerolog/log"

"github.com/elastic/fleet-server/v7/internal/pkg/es"
)

func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) {
Expand Down Expand Up @@ -74,6 +76,9 @@ func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id s
if !ok {
return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data)
}
if err := es.TranslateError(r.Status, r.Error); err != nil {
return nil, err
}
return r, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
return err
}
}
if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("Fail unenrollAgent record update")
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/dl/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent m
if err != nil {
return err
}
return bulker.Update(ctx, o.indexName, agent.ID, data)
return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3))
}
4 changes: 2 additions & 2 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (p *Output) prepareElasticsearch(
return err
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil {
if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (p *Output) prepareElasticsearch(
return fmt.Errorf("could no tupdate painless script: %w", err)
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil {
if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return fmt.Errorf("fail update agent record: %w", err)
}
Expand Down

0 comments on commit f77b97c

Please sign in to comment.