Skip to content

Commit

Permalink
Remove the coordinator (#3131)
Browse files Browse the repository at this point in the history
Remove the policy coordinator and policy leader election mechanisms
from fleet-server. Deprecate the coordinator_idx value in
fleet-server's json schema and remove coordinator_idx references when
processing policies.
The v8.5 migration generated new output keys for an agent by forcing the
policy outputs to be prepared by incrementing the coordinator_idx for
the policy. Behaviour was changed instead to detect if a single output
has no api_key value (empty string) and subscribe with a revision of 0.
  • Loading branch information
michel-laterman authored Jul 4, 2024
1 parent 31b971f commit c9ff41b
Show file tree
Hide file tree
Showing 31 changed files with 283 additions and 1,663 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1701298362-Remove-policy-coordinator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: other

# Change summary; a 80ish characters long description of the change.
summary: Remove policy coordinator

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Remove policy coordinator and fleet-server leader election mechanisms. Mark coordinator_idx as deprecated.

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3131

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 1704
28 changes: 8 additions & 20 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

"github.com/miolini/datacounter"
"github.com/rs/zerolog"
"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
Expand All @@ -27,8 +29,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"
)

const (
Expand Down Expand Up @@ -385,30 +385,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
defer span.End()
// If more than one, pick the winner;
// 0) Correct policy id
// 1) Highest revision/coordinator number
// 1) Highest revision number

found := false
currRev := agent.PolicyRevisionIdx
currCoord := agent.PolicyCoordinatorIdx
vSpan, _ := apm.StartSpan(ctx, "checkPolicyActions", "validate")
for _, a := range actionIds {
rev, ok := policy.RevisionFromString(a)

zlog.Debug().
Str("agent.policyId", agent.PolicyID).
Int64("agent.revisionIdx", currRev).
Int64("agent.coordinatorIdx", currCoord).
Str("rev.policyId", rev.PolicyID).
Int64(logger.RevisionIdx, rev.RevisionIdx).
Int64(logger.CoordinatorIdx, rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID && rev.RevisionIdx > currRev {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
}
}

Expand All @@ -433,7 +427,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag

err := ack.updateAgentDoc(ctx, zlog,
agent.Id,
currRev, currCoord,
currRev,
agent.PolicyID)
if err != nil {
return err
Expand Down Expand Up @@ -507,15 +501,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
func (ack *AckT) updateAgentDoc(ctx context.Context,
zlog zerolog.Logger,
agentID string,
currRev, currCoord int64,
currRev int64,
policyID string,
) error {
span, ctx := apm.StartSpan(ctx, "updateAgentDoc", "update")
defer span.End()
body := makeUpdatePolicyBody(
policyID,
currRev,
currCoord,
)

err := ack.bulk.Update(
Expand All @@ -530,7 +523,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context,
zlog.Err(err).
Str(LogPolicyID, policyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Msg("ack policy")

if err != nil {
Expand Down Expand Up @@ -663,7 +655,7 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
// an agent while we were busy updating the old one. A blind update to the
// agent record without a check could set the revision and coordIdx for the wrong
// agent record without a check could set the revision for the wrong
// policy. This script should be coupled with a "retry_on_conflict" parameter
// to allow for *other* changes to the agent record while we running the script.
// (For example, say the background bulk check-in timestamp update task fires)
Expand All @@ -673,12 +665,10 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age
const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.remove('default_api_key_history');ctx._source.` +
dl.FieldPolicyRevisionIdx +
` = params.rev;ctx._source.` +
dl.FieldPolicyCoordinatorIdx +
`= params.coord;ctx._source.` +
dl.FieldUpdatedAt +
` = params.ts;} else {ctx.op = \"noop\";}","params": {"id":"`

func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte {
func makeUpdatePolicyBody(policyID string, newRev int64) []byte {
var buf bytes.Buffer
buf.Grow(410)

Expand All @@ -687,8 +677,6 @@ func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte {
buf.WriteString(policyID)
buf.WriteString(`","rev":`)
buf.WriteString(strconv.FormatInt(newRev, 10))
buf.WriteString(`,"coord":`)
buf.WriteString(strconv.FormatInt(coordIdx, 10))
buf.WriteString(`,"ts":"`)
buf.WriteString(time.Now().UTC().Format(time.RFC3339))
buf.WriteString(`"}}}`)
Expand Down
6 changes: 2 additions & 4 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,20 @@ import (
func BenchmarkMakeUpdatePolicyBody(b *testing.B) {
const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207"
const newRev = 2
const coord = 1

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
makeUpdatePolicyBody(policyID, newRev, coord)
makeUpdatePolicyBody(policyID, newRev)
}
}

func TestMakeUpdatePolicyBody(t *testing.T) {
const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207"
const newRev = 2
const coord = 1

data := makeUpdatePolicyBody(policyID, newRev, coord)
data := makeUpdatePolicyBody(policyID, newRev)

var i interface{}
err := json.Unmarshal(data, &i)
Expand Down
13 changes: 11 additions & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,18 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer ct.ad.Unsubscribe(aSub)
actCh := aSub.Ch()

// use revision_idx=0 if the agent has a single output where no API key is defined
// This will force the policy monitor to emit a new policy to regerate API keys
revID := agent.PolicyRevisionIdx
for _, output := range agent.Outputs {
if output.APIKey == "" {
revID = 0
break
}
}

// Subscribe to policy manager for changes on PolicyId > policyRev
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx, agent.PolicyCoordinatorIdx)
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID)
if err != nil {
return fmt.Errorf("subscribe policy monitor: %w", err)
}
Expand Down Expand Up @@ -791,7 +801,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
zlog = zlog.With().
Str("fleet.ctx", "processPolicy").
Int64(logger.RevisionIdx, pp.Policy.RevisionIdx).
Int64(logger.CoordinatorIdx, pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

Expand Down
33 changes: 0 additions & 33 deletions internal/pkg/coordinator/coordinator.go

This file was deleted.

Loading

0 comments on commit c9ff41b

Please sign in to comment.