From 5ccc7cddd82ea0298bba400d8e5d960ab846c265 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 21 Sep 2022 16:56:09 -0400 Subject: [PATCH 1/3] Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896) * Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. * Add changelog entry. * Update CHANGELOG.next.asciidoc Co-authored-by: Craig MacKenzie Co-authored-by: Craig MacKenzie (cherry picked from commit f77b97c875664beb267dd8695aba5b150da12679) # Conflicts: # CHANGELOG.next.asciidoc # internal/pkg/policy/policy_output.go --- CHANGELOG.next.asciidoc | 4 ++ internal/pkg/api/handleAck.go | 4 +- internal/pkg/bulk/opBulk.go | 5 ++ internal/pkg/coordinator/monitor.go | 2 +- internal/pkg/dl/servers.go | 2 +- internal/pkg/policy/policy_output.go | 89 ++++++++++++++++++++++++++++ 6 files changed, 102 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c65b70b5a..b49e0bea5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -6,8 +6,12 @@ - Remove events from agent checkin body. {issue}1774[1774] - Improve authc debug logging. {pull}1870[1870] - Add error detail to catch-all HTTP error response. {pull}1854[1854] +<<<<<<< HEAD - Update apikey.cache_hit log field name to match convention. {pull}1900[1900] - LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912] +======= +- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896] +>>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)) ==== New Features diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 3f284b5da..4b9831894 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -407,7 +407,7 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent return errors.Wrap(err, "handleUnenroll marshal") } - if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { return errors.Wrap(err, "handleUnenroll update") } @@ -428,7 +428,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * return errors.Wrap(err, "handleUpgrade marshal") } - if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { return errors.Wrap(err, "handleUpgrade update") } diff --git a/internal/pkg/bulk/opBulk.go b/internal/pkg/bulk/opBulk.go index 50b2c47e0..e7fc6a85a 100644 --- a/internal/pkg/bulk/opBulk.go +++ b/internal/pkg/bulk/opBulk.go @@ -13,6 +13,8 @@ import ( "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/mailru/easyjson" "github.com/rs/zerolog/log" + + "github.com/elastic/fleet-server/v7/internal/pkg/es" ) func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) { @@ -73,6 +75,9 @@ func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id s if !ok { return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data) } + if err := es.TranslateError(r.Status, r.Error); err != nil { + return nil, err + } return r, nil } diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 53870e58e..cca266672 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -560,7 +560,7 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return err } } - if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh()); err != nil { + if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { zlog.Error().Err(err).Msg("Fail unenrollAgent record update") } diff --git a/internal/pkg/dl/servers.go b/internal/pkg/dl/servers.go index 409654e60..2686bf560 100644 --- a/internal/pkg/dl/servers.go +++ b/internal/pkg/dl/servers.go @@ -55,5 +55,5 @@ func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent m if err != nil { return err } - return bulker.Update(ctx, o.indexName, agent.ID, data) + return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) } diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 8115d22ec..220700373 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -67,6 +67,7 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker zlog.Debug().Msg("policy output permissions are the same") } +<<<<<<< HEAD if needNewKey { zlog.Debug(). RawJSON("roles", p.Role.Raw). @@ -78,10 +79,69 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker if err != nil { zlog.Error().Err(err).Msg("fail generate output key") return err +======= + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + zlog.Debug(). + Str("hash.sha256", p.Role.Sha2). + Str("roles", string(p.Role.Raw)). + Msg("Updating agent record to pick up most recent roles.") + + fields := map[string]interface{}{ + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + + // Using painless script to update permission hash for updated key + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return err + } + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return err + } + + } else if needNewKey { + zlog.Debug(). + RawJSON("fleet.policy.roles", p.Role.Raw). + Str("fleet.policy.default.oldHash", output.PermissionsHash). + Str("fleet.policy.default.newHash", p.Role.Sha2). + Msg("Generating a new API key") + + ctx := zlog.WithContext(ctx) + outputAPIKey, err := + generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) + if err != nil { + return fmt.Errorf("failed generate output API key: %w", err) + } + + // When a new keys is generated we need to update the Agent record, + // this will need to be updated when multiples remote Elasticsearch output + // are supported. + zlog.Info(). + Str("fleet.policy.role.hash.sha256", p.Role.Sha2). + Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). + Msg("Updating agent record to pick up default output key.") + + fields := map[string]interface{}{ + dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), + dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + + if !foundOutput { + fields[dl.FiledType] = OutputTypeElasticsearch + } + if output.APIKeyID != "" { + fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ + ID: output.APIKeyID, + RetiredAt: time.Now().UTC().Format(time.RFC3339), +>>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)) } agent.DefaultAPIKey = outputAPIKey.Agent() +<<<<<<< HEAD // When a new keys is generated we need to update the Agent record, // this will need to be updated when multiples Elasticsearch output // are used. @@ -89,6 +149,35 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker Str("hash.sha256", p.Role.Sha2). Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). Msg("Updating agent record to pick up default output key.") +======= + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return fmt.Errorf("fail update agent record: %w", err) + } + + // Now that all is done, we can update the output on the agent variable + // Right not it's more for consistency and to ensure the in-memory agent + // data is correct and in sync with ES, so it can be safely used after + // this method returns. + output.Type = OutputTypeElasticsearch + output.APIKey = outputAPIKey.Agent() + output.APIKeyID = outputAPIKey.ID + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + } + + // Always insert the `api_key` as part of the output block, this is required + // because only fleet server knows the api key for the specific agent, if we don't + // add it the agent will not receive the `api_key` and will not be able to connect + // to Elasticsearch. + // + // We need to investigate allocation with the new LS output, we had optimization + // in place to reduce number of agent policy allocation when sending the updated + // agent policy to multiple agents. + // See: https://github.com/elastic/fleet-server/issues/1301 + if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil { + return err + } +>>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)) fields := map[string]interface{}{ dl.FieldDefaultAPIKey: outputAPIKey.Agent(), From 8eb169a00521fd1662db40b4d410e079ca22c112 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 30 Sep 2022 10:20:28 -0400 Subject: [PATCH 2/3] Fix conflict issues. --- CHANGELOG.next.asciidoc | 5 -- internal/pkg/policy/policy_output.go | 89 ---------------------------- 2 files changed, 94 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b49e0bea5..b633f62dd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -6,12 +6,7 @@ - Remove events from agent checkin body. {issue}1774[1774] - Improve authc debug logging. {pull}1870[1870] - Add error detail to catch-all HTTP error response. {pull}1854[1854] -<<<<<<< HEAD -- Update apikey.cache_hit log field name to match convention. {pull}1900[1900] -- LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912] -======= - Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896] ->>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)) ==== New Features diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 220700373..8115d22ec 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -67,7 +67,6 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker zlog.Debug().Msg("policy output permissions are the same") } -<<<<<<< HEAD if needNewKey { zlog.Debug(). RawJSON("roles", p.Role.Raw). @@ -79,69 +78,10 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker if err != nil { zlog.Error().Err(err).Msg("fail generate output key") return err -======= - output.PermissionsHash = p.Role.Sha2 // for the sake of consistency - zlog.Debug(). - Str("hash.sha256", p.Role.Sha2). - Str("roles", string(p.Role.Raw)). - Msg("Updating agent record to pick up most recent roles.") - - fields := map[string]interface{}{ - dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, - } - - // Using painless script to update permission hash for updated key - body, err := renderUpdatePainlessScript(p.Name, fields) - if err != nil { - return err - } - - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { - zlog.Error().Err(err).Msg("fail update agent record") - return err - } - - } else if needNewKey { - zlog.Debug(). - RawJSON("fleet.policy.roles", p.Role.Raw). - Str("fleet.policy.default.oldHash", output.PermissionsHash). - Str("fleet.policy.default.newHash", p.Role.Sha2). - Msg("Generating a new API key") - - ctx := zlog.WithContext(ctx) - outputAPIKey, err := - generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw) - if err != nil { - return fmt.Errorf("failed generate output API key: %w", err) - } - - // When a new keys is generated we need to update the Agent record, - // this will need to be updated when multiples remote Elasticsearch output - // are supported. - zlog.Info(). - Str("fleet.policy.role.hash.sha256", p.Role.Sha2). - Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). - Msg("Updating agent record to pick up default output key.") - - fields := map[string]interface{}{ - dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(), - dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID, - dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, - } - - if !foundOutput { - fields[dl.FiledType] = OutputTypeElasticsearch - } - if output.APIKeyID != "" { - fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{ - ID: output.APIKeyID, - RetiredAt: time.Now().UTC().Format(time.RFC3339), ->>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)) } agent.DefaultAPIKey = outputAPIKey.Agent() -<<<<<<< HEAD // When a new keys is generated we need to update the Agent record, // this will need to be updated when multiples Elasticsearch output // are used. @@ -149,35 +89,6 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker Str("hash.sha256", p.Role.Sha2). Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID). Msg("Updating agent record to pick up default output key.") -======= - if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil { - zlog.Error().Err(err).Msg("fail update agent record") - return fmt.Errorf("fail update agent record: %w", err) - } - - // Now that all is done, we can update the output on the agent variable - // Right not it's more for consistency and to ensure the in-memory agent - // data is correct and in sync with ES, so it can be safely used after - // this method returns. - output.Type = OutputTypeElasticsearch - output.APIKey = outputAPIKey.Agent() - output.APIKeyID = outputAPIKey.ID - output.PermissionsHash = p.Role.Sha2 // for the sake of consistency - } - - // Always insert the `api_key` as part of the output block, this is required - // because only fleet server knows the api key for the specific agent, if we don't - // add it the agent will not receive the `api_key` and will not be able to connect - // to Elasticsearch. - // - // We need to investigate allocation with the new LS output, we had optimization - // in place to reduce number of agent policy allocation when sending the updated - // agent policy to multiple agents. - // See: https://github.com/elastic/fleet-server/issues/1301 - if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil { - return err - } ->>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896)) fields := map[string]interface{}{ dl.FieldDefaultAPIKey: outputAPIKey.Agent(), From e747dcc1cfad9738e865de469f2d2aa0321af665 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 30 Sep 2022 13:58:17 -0400 Subject: [PATCH 3/3] Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b633f62dd..359dede60 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -6,6 +6,8 @@ - Remove events from agent checkin body. {issue}1774[1774] - Improve authc debug logging. {pull}1870[1870] - Add error detail to catch-all HTTP error response. {pull}1854[1854] +- Update apikey.cache_hit log field name to match convention. {pull}1900[1900] +- LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912] - Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896] ==== New Features