Skip to content

Commit

Permalink
Remove the coordinator
Browse files Browse the repository at this point in the history
Remove the policy coordinator and policy leader election mechanisms
from fleet-server. Deprecate the coordinator_idx value in
fleet-server's json schema and remove coordinator_idx references when
processing policies.
  • Loading branch information
michel-laterman committed Nov 30, 2023
1 parent 9417467 commit 28c2822
Show file tree
Hide file tree
Showing 29 changed files with 95 additions and 1,518 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1701298362-Remove-policy-coordinator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: other

# Change summary; a 80ish characters long description of the change.
summary: Remove policy coordinator

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Remove policy coordinator and fleet-server leader election mechanisms. Mark coordinator_idx as deprecated.

# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3131

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
24 changes: 6 additions & 18 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,30 +384,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
defer span.End()
// If more than one, pick the winner;
// 0) Correct policy id
// 1) Highest revision/coordinator number
// 1) Highest revision number

found := false
currRev := agent.PolicyRevisionIdx
currCoord := agent.PolicyCoordinatorIdx
vSpan, _ := apm.StartSpan(ctx, "checkPolicyActions", "validate")
for _, a := range actionIds {
rev, ok := policy.RevisionFromString(a)

zlog.Debug().
Str("agent.policyId", agent.PolicyID).
Int64("agent.revisionIdx", currRev).
Int64("agent.coordinatorIdx", currCoord).
Str("rev.policyId", rev.PolicyID).
Int64("rev.revisionIdx", rev.RevisionIdx).
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID && rev.RevisionIdx > currRev {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
}
}

Expand All @@ -432,7 +426,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag

err := ack.updateAgentDoc(ctx, zlog,
agent.Id,
currRev, currCoord,
currRev,
agent.PolicyID)
if err != nil {
return err
Expand Down Expand Up @@ -506,15 +500,14 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
func (ack *AckT) updateAgentDoc(ctx context.Context,
zlog zerolog.Logger,
agentID string,
currRev, currCoord int64,
currRev int64,
policyID string,
) error {
span, ctx := apm.StartSpan(ctx, "updateAgentDoc", "update")
defer span.End()
body := makeUpdatePolicyBody(
policyID,
currRev,
currCoord,
)

err := ack.bulk.Update(
Expand All @@ -529,7 +522,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context,
zlog.Err(err).
Str(LogPolicyID, policyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Msg("ack policy")

if err != nil {
Expand Down Expand Up @@ -708,7 +700,7 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
// an agent while we were busy updating the old one. A blind update to the
// agent record without a check could set the revision and coordIdx for the wrong
// agent record without a check could set the revision for the wrong
// policy. This script should be coupled with a "retry_on_conflict" parameter
// to allow for *other* changes to the agent record while we running the script.
// (For example, say the background bulk check-in timestamp update task fires)
Expand All @@ -718,12 +710,10 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age
const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.remove('default_api_key_history');ctx._source.` +
dl.FieldPolicyRevisionIdx +
` = params.rev;ctx._source.` +
dl.FieldPolicyCoordinatorIdx +
`= params.coord;ctx._source.` +
dl.FieldUpdatedAt +
` = params.ts;} else {ctx.op = \"noop\";}","params": {"id":"`

func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte {
func makeUpdatePolicyBody(policyID string, newRev int64) []byte {
var buf bytes.Buffer
buf.Grow(410)

Expand All @@ -732,8 +722,6 @@ func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte {
buf.WriteString(policyID)
buf.WriteString(`","rev":`)
buf.WriteString(strconv.FormatInt(newRev, 10))
buf.WriteString(`,"coord":`)
buf.WriteString(strconv.FormatInt(coordIdx, 10))
buf.WriteString(`,"ts":"`)
buf.WriteString(time.Now().UTC().Format(time.RFC3339))
buf.WriteString(`"}}}`)
Expand Down
7 changes: 2 additions & 5 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,17 @@ func BenchmarkMakeUpdatePolicyBody(b *testing.B) {

const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207"
const newRev = 2
const coord = 1

for n := 0; n < b.N; n++ {
makeUpdatePolicyBody(policyID, newRev, coord)
makeUpdatePolicyBody(policyID, newRev)
}
}

func TestMakeUpdatePolicyBody(t *testing.T) {

const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207"
const newRev = 2
const coord = 1

data := makeUpdatePolicyBody(policyID, newRev, coord)
data := makeUpdatePolicyBody(policyID, newRev)

var i interface{}
err := json.Unmarshal(data, &i)
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
actCh := aSub.Ch()

// Subscribe to policy manager for changes on PolicyId > policyRev
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx, agent.PolicyCoordinatorIdx)
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx)
if err != nil {
return fmt.Errorf("subscribe policy monitor: %w", err)
}
Expand Down Expand Up @@ -730,7 +730,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
zlog = zlog.With().
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

Expand Down
33 changes: 0 additions & 33 deletions internal/pkg/coordinator/coordinator.go

This file was deleted.

Loading

0 comments on commit 28c2822

Please sign in to comment.