From 28c2822bedf12d90f44d72de83feb5edc91b5514 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 29 Nov 2023 11:36:41 -0800 Subject: [PATCH 1/8] Remove the coordinator Remove the policy coordinator and policy leader election mechanisms from fleet-server. Deprecate the coordinator_idx value in fleet-server's json schema and remove coordinator_idx references when processing policies. --- .../1701298362-Remove-policy-coordinator.yaml | 32 ++ internal/pkg/api/handleAck.go | 24 +- internal/pkg/api/handleAck_test.go | 7 +- internal/pkg/api/handleCheckin.go | 3 +- internal/pkg/coordinator/coordinator.go | 33 -- internal/pkg/coordinator/monitor.go | 440 ------------------ .../coordinator/monitor_integration_test.go | 215 --------- internal/pkg/coordinator/v0.go | 87 ---- internal/pkg/coordinator/v0_test.go | 92 ---- internal/pkg/dl/constants.go | 4 - internal/pkg/dl/migration.go | 13 +- internal/pkg/dl/migration_integration_test.go | 7 +- internal/pkg/dl/policies.go | 1 - internal/pkg/dl/policies_integration_test.go | 1 - internal/pkg/dl/policies_leader.go | 154 ------ .../dl/policies_leader_integration_test.go | 176 ------- internal/pkg/dl/servers.go | 59 --- internal/pkg/dl/servers_integration_test.go | 58 --- internal/pkg/model/schema.go | 2 +- internal/pkg/policy/monitor.go | 24 +- .../pkg/policy/monitor_integration_test.go | 11 +- internal/pkg/policy/monitor_test.go | 107 +---- internal/pkg/policy/revision.go | 26 +- internal/pkg/policy/self_test.go | 6 - internal/pkg/policy/sub.go | 8 +- internal/pkg/policy/sub_test.go | 12 +- internal/pkg/server/fleet.go | 6 +- internal/pkg/server/fleet_integration_test.go | 2 +- model/schema.json | 3 +- 29 files changed, 95 insertions(+), 1518 deletions(-) create mode 100644 changelog/fragments/1701298362-Remove-policy-coordinator.yaml delete mode 100644 internal/pkg/coordinator/coordinator.go delete mode 100644 internal/pkg/coordinator/monitor.go delete mode 100644 internal/pkg/coordinator/monitor_integration_test.go delete mode 100644 internal/pkg/coordinator/v0.go delete mode 100644 internal/pkg/coordinator/v0_test.go delete mode 100644 internal/pkg/dl/policies_leader.go delete mode 100644 internal/pkg/dl/policies_leader_integration_test.go delete mode 100644 internal/pkg/dl/servers.go delete mode 100644 internal/pkg/dl/servers_integration_test.go diff --git a/changelog/fragments/1701298362-Remove-policy-coordinator.yaml b/changelog/fragments/1701298362-Remove-policy-coordinator.yaml new file mode 100644 index 000000000..dba733837 --- /dev/null +++ b/changelog/fragments/1701298362-Remove-policy-coordinator.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: other + +# Change summary; a 80ish characters long description of the change. +summary: Remove policy coordinator + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: Remove policy coordinator and fleet-server leader election mechanisms. Mark coordinator_idx as deprecated. + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 3131 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 489aefba5..9e03943a9 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -384,11 +384,10 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag defer span.End() // If more than one, pick the winner; // 0) Correct policy id - // 1) Highest revision/coordinator number + // 1) Highest revision number found := false currRev := agent.PolicyRevisionIdx - currCoord := agent.PolicyCoordinatorIdx vSpan, _ := apm.StartSpan(ctx, "checkPolicyActions", "validate") for _, a := range actionIds { rev, ok := policy.RevisionFromString(a) @@ -396,18 +395,13 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag zlog.Debug(). Str("agent.policyId", agent.PolicyID). Int64("agent.revisionIdx", currRev). - Int64("agent.coordinatorIdx", currCoord). Str("rev.policyId", rev.PolicyID). Int64("rev.revisionIdx", rev.RevisionIdx). - Int64("rev.coordinatorIdx", rev.CoordinatorIdx). Msg("ack policy revision") - if ok && rev.PolicyID == agent.PolicyID && - (rev.RevisionIdx > currRev || - (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { + if ok && rev.PolicyID == agent.PolicyID && rev.RevisionIdx > currRev { found = true currRev = rev.RevisionIdx - currCoord = rev.CoordinatorIdx } } @@ -432,7 +426,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag err := ack.updateAgentDoc(ctx, zlog, agent.Id, - currRev, currCoord, + currRev, agent.PolicyID) if err != nil { return err @@ -506,7 +500,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, func (ack *AckT) updateAgentDoc(ctx context.Context, zlog zerolog.Logger, agentID string, - currRev, currCoord int64, + currRev int64, policyID string, ) error { span, ctx := apm.StartSpan(ctx, "updateAgentDoc", "update") @@ -514,7 +508,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context, body := makeUpdatePolicyBody( policyID, currRev, - currCoord, ) err := ack.bulk.Update( @@ -529,7 +522,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context, zlog.Err(err). Str(LogPolicyID, policyID). Int64("policyRevision", currRev). - Int64("policyCoordinator", currCoord). Msg("ack policy") if err != nil { @@ -708,7 +700,7 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age // has not changed underneath us by an upstream process (Kibana or otherwise). // We have a race condition where a user could have assigned a new policy to // an agent while we were busy updating the old one. A blind update to the -// agent record without a check could set the revision and coordIdx for the wrong +// agent record without a check could set the revision for the wrong // policy. This script should be coupled with a "retry_on_conflict" parameter // to allow for *other* changes to the agent record while we running the script. // (For example, say the background bulk check-in timestamp update task fires) @@ -718,12 +710,10 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.remove('default_api_key_history');ctx._source.` + dl.FieldPolicyRevisionIdx + ` = params.rev;ctx._source.` + - dl.FieldPolicyCoordinatorIdx + - `= params.coord;ctx._source.` + dl.FieldUpdatedAt + ` = params.ts;} else {ctx.op = \"noop\";}","params": {"id":"` -func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte { +func makeUpdatePolicyBody(policyID string, newRev int64) []byte { var buf bytes.Buffer buf.Grow(410) @@ -732,8 +722,6 @@ func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte { buf.WriteString(policyID) buf.WriteString(`","rev":`) buf.WriteString(strconv.FormatInt(newRev, 10)) - buf.WriteString(`,"coord":`) - buf.WriteString(strconv.FormatInt(coordIdx, 10)) buf.WriteString(`,"ts":"`) buf.WriteString(time.Now().UTC().Format(time.RFC3339)) buf.WriteString(`"}}}`) diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 45839a916..5fdccfdb6 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -32,20 +32,17 @@ func BenchmarkMakeUpdatePolicyBody(b *testing.B) { const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207" const newRev = 2 - const coord = 1 for n := 0; n < b.N; n++ { - makeUpdatePolicyBody(policyID, newRev, coord) + makeUpdatePolicyBody(policyID, newRev) } } func TestMakeUpdatePolicyBody(t *testing.T) { - const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207" const newRev = 2 - const coord = 1 - data := makeUpdatePolicyBody(policyID, newRev, coord) + data := makeUpdatePolicyBody(policyID, newRev) var i interface{} err := json.Unmarshal(data, &i) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 8512a637c..0b4c33063 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -246,7 +246,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actCh := aSub.Ch() // Subscribe to policy manager for changes on PolicyId > policyRev - sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx, agent.PolicyCoordinatorIdx) + sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx) if err != nil { return fmt.Errorf("subscribe policy monitor: %w", err) } @@ -730,7 +730,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a zlog = zlog.With(). Str("fleet.ctx", "processPolicy"). Int64("fleet.policyRevision", pp.Policy.RevisionIdx). - Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx). Str(LogPolicyID, pp.Policy.PolicyID). Logger() diff --git a/internal/pkg/coordinator/coordinator.go b/internal/pkg/coordinator/coordinator.go deleted file mode 100644 index c251147d3..000000000 --- a/internal/pkg/coordinator/coordinator.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -// Package coordinator handles the deployment of Elastic Agent policies between Fleet Server node. -package coordinator - -import ( - "context" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" -) - -// Factory creates a new coordinator for a policy. -type Factory func(policy model.Policy) (Coordinator, error) - -// Coordinator processes a policy and produces a new policy. -type Coordinator interface { - // Name is the name of the coordinator - Name() string - - // Run runs the coordinator for the policy. - Run(ctx context.Context) error - - // Update called to signal a new policy revision has been defined. - // - // This should not block as its called by the main loop in the coordinator manager. The implemented coordinator - // should push it over a channel for the work to be done in another go routine. - Update(ctx context.Context, policy model.Policy) error - - // Output is the output channel for updated coordinated policies. - Output() <-chan model.Policy -} diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go deleted file mode 100644 index 692292fa2..000000000 --- a/internal/pkg/coordinator/monitor.go +++ /dev/null @@ -1,440 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package coordinator - -import ( - "context" - "errors" - "fmt" - "net" - "os" - "runtime" - "sync" - "time" - - "github.com/rs/zerolog" - "go.elastic.co/apm/v2" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/elastic/fleet-server/v7/internal/pkg/monitor" - "github.com/elastic/fleet-server/v7/internal/pkg/sleep" -) - -const ( - defaultCheckInterval = 20 * time.Second // check for valid leaders every 20 seconds - defaultLeaderInterval = 30 * time.Second // become leader for at least 30 seconds - defaultMetadataInterval = 5 * time.Minute // update metadata every 5 minutes - defaultCoordinatorRestartDelay = 5 * time.Second // delay in restarting coordinator on failure -) - -// Monitor monitors the leader election of policies and routes managed policies to the coordinator. -type Monitor interface { - // Run runs the monitor. - Run(context.Context) error -} - -type policyT struct { - id string - cord Coordinator - cordCanceller context.CancelFunc -} - -type monitorT struct { - bulker bulk.Bulk - monitor monitor.Monitor - factory Factory - - fleet config.Fleet - version string - agentMetadata model.AgentMetadata - hostMetadata model.HostMetadata - - checkInterval time.Duration - leaderInterval time.Duration - metadataInterval time.Duration - coordRestartDelay time.Duration - - serversIndex string - policiesIndex string - leadersIndex string - agentsIndex string - - policies map[string]policyT - - muPoliciesCanceller sync.Mutex - policiesCanceller map[string]context.CancelFunc -} - -// NewMonitor creates a new coordinator policy monitor. -func NewMonitor(fleet config.Fleet, version string, bulker bulk.Bulk, monitor monitor.Monitor, factory Factory) Monitor { - return &monitorT{ - version: version, - fleet: fleet, - bulker: bulker, - monitor: monitor, - factory: factory, - checkInterval: defaultCheckInterval, - leaderInterval: defaultLeaderInterval, - metadataInterval: defaultMetadataInterval, - coordRestartDelay: defaultCoordinatorRestartDelay, - serversIndex: dl.FleetServers, - policiesIndex: dl.FleetPolicies, - leadersIndex: dl.FleetPoliciesLeader, - agentsIndex: dl.FleetAgents, - policies: make(map[string]policyT), - policiesCanceller: make(map[string]context.CancelFunc), - } -} - -// Run runs the monitor. -func (m *monitorT) Run(ctx context.Context) (err error) { - log := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Logger() - // When ID of the Agent is not provided to Fleet Server then the Agent - // has not enrolled. The Fleet Server cannot become a leader until the - // Agent it is running under has been enrolled. - m.calcMetadata(ctx) - if m.agentMetadata.ID == "" { - log.Warn().Msg("missing config fleet.agent.id; acceptable until Elastic Agent has enrolled") - <-ctx.Done() - return ctx.Err() - } - - // Start timer loop to ensure leadership - lT := time.NewTimer(m.checkInterval) - defer lT.Stop() - - // Ensure leadership on startup - for { - err = m.ensureLeadership(ctx) - if err != nil { - log.Warn().Err(err).Msg("error ensuring leadership, will retry") - select { - case <-lT.C: - lT.Reset(m.checkInterval) - continue - case <-ctx.Done(): - m.releaseLeadership() - return ctx.Err() - } - } - break - } - - // Subscribe to the monitor for policies - s := m.monitor.Subscribe() - defer m.monitor.Unsubscribe(s) - - // Start timer to update metadata (mainly for updated IP addresses of the host) - mT := time.NewTimer(m.metadataInterval) - defer mT.Stop() - - // Keep track of errored statuses - erroredOnLastRequest := false - numFailedRequests := 0 - for { - select { - case hits := <-s.Output(): - err = m.handlePolicies(ctx, hits) - if err != nil { - erroredOnLastRequest = true - numFailedRequests++ - log.Warn().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") - } - case <-mT.C: - m.calcMetadata(ctx) - mT.Reset(m.metadataInterval) - case <-lT.C: - err = m.ensureLeadership(ctx) - if err != nil { - erroredOnLastRequest = true - numFailedRequests++ - log.Warn().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") - } - lT.Reset(m.checkInterval) - case <-ctx.Done(): - m.releaseLeadership() - return ctx.Err() - } - if err == nil && erroredOnLastRequest { - erroredOnLastRequest = false - log.Info().Msgf("Policy leader monitor successfully recovered after %d attempts", numFailedRequests) - numFailedRequests = 0 - } - } -} - -// handlePolicies handles new policies or policy changes. -func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { - log := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Logger() - new := false - for _, hit := range hits { - var policy model.Policy - err := hit.Unmarshal(&policy) - if err != nil { - log.Debug().Err(err).Msg("Failed to deserialize policy json") - return err - } - if policy.CoordinatorIdx != 0 { - // policy revision was inserted by coordinator so this monitor ignores it - continue - } - p, ok := m.policies[policy.PolicyID] - if ok { - // not a new policy - if p.cord != nil { - // current leader send to its coordinator - err = p.cord.Update(ctx, policy) - if err != nil { - log.Info().Err(err).Msg("Failed to update policy leader") - return err - } - } - } else { - new = true - } - } - if new { - // new policy discovered; leadership needs to be performed - err := m.ensureLeadership(ctx) - if err != nil { - return err - } - } - return nil -} - -// ensureLeadership ensures leadership is held or needs to be taken over. -func (m *monitorT) ensureLeadership(ctx context.Context) error { - if m.bulker.HasTracer() { - trans := m.bulker.StartTransaction("Ensure leadership", "bulker") - ctx = apm.ContextWithTransaction(ctx, trans) - defer trans.End() - } - zerolog.Ctx(ctx).Debug().Str("ctx", "policy leader manager").Msg("ensuring leadership of policies") - err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) - - if err != nil { - return fmt.Errorf("failed to check server status on Elasticsearch (%s): %w", m.hostMetadata.Name, err) - } - - // fetch current policies and leaders - leaders := map[string]model.PolicyLeader{} - policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) - if err != nil { - if errors.Is(err, es.ErrIndexNotFound) { - zerolog.Ctx(ctx).Debug().Str("ctx", "policy leader manager").Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) - return nil - } - return fmt.Errorf("encountered error while querying policies: %w", err) - } - if len(policies) > 0 { - ids := make([]string, len(policies)) - for i, p := range policies { - ids[i] = p.PolicyID - } - leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) - if err != nil { - if !errors.Is(err, es.ErrIndexNotFound) { - return fmt.Errorf("encountered error while fetching policy leaders: %w", err) - } - } - } - - // determine the policies that lead needs to be taken - var lead []model.Policy - now := time.Now().UTC() - for _, policy := range policies { - leader, ok := leaders[policy.PolicyID] - if !ok { - // new policy want to try to take leadership - lead = append(lead, policy) - continue - } - t, err := leader.Time() - if err != nil { - return err - } - if now.Sub(t) > m.leaderInterval || leader.Server.ID == m.agentMetadata.ID { - // policy needs a new leader or already leader - lead = append(lead, policy) - } - } - - // take/keep leadership and start new coordinators - res := make(chan policyT) - for _, p := range lead { - pt := m.policies[p.PolicyID] - pt.id = p.PolicyID - go func(p model.Policy, pt policyT) { - defer func() { - res <- pt - }() - - l := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Str(dl.FieldPolicyID, pt.id).Logger() - err := dl.TakePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.version, dl.WithIndexName(m.leadersIndex)) - if err != nil { - l.Warn().Err(err).Msg("monitor.ensureLeadership: failed to take ownership") - if pt.cord != nil { - pt.cord = nil - } - if pt.cordCanceller != nil { - pt.cordCanceller() - pt.cordCanceller = nil - } - return - } - if pt.cord == nil { - cord, err := m.factory(p) - if err != nil { - l.Err(err).Msg("failed to start coordinator") - err = dl.ReleasePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.leaderInterval, dl.WithIndexName(m.leadersIndex)) - if err != nil { - l.Err(err).Msg("failed to release policy leadership") - } - return - } - - cordCtx, canceller := context.WithCancel(ctx) - go runCoordinator(cordCtx, cord, l, m.coordRestartDelay) - go runCoordinatorOutput(cordCtx, cord, m.bulker, l, m.policiesIndex) - pt.cord = cord - pt.cordCanceller = canceller - } else { - err = pt.cord.Update(ctx, p) - if err != nil { - l.Err(err).Msg("failed to update coordinator") - } - } - }(p, pt) - } - for range lead { - r := <-res - if r.cord == nil { - // either failed to take leadership or lost leadership - delete(m.policies, r.id) - - m.muPoliciesCanceller.Lock() - delete(m.policiesCanceller, r.id) - m.muPoliciesCanceller.Unlock() - } else { - m.policies[r.id] = r - } - } - return nil -} - -// releaseLeadership releases current leadership -func (m *monitorT) releaseLeadership() { - var wg sync.WaitGroup - wg.Add(len(m.policies)) - for _, pt := range m.policies { - go func(pt policyT) { - if pt.cord != nil { - pt.cordCanceller() - } - // uses a background context, because the context for the - // monitor will be cancelled at this point in the code - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := dl.ReleasePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.leaderInterval, dl.WithIndexName(m.leadersIndex)) - if err != nil { - l := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Str(dl.FieldPolicyID, pt.id).Logger() - l.Warn().Err(err).Msg("monitor.releaseLeadership: failed to release leadership") - } - wg.Done() - }(pt) - } - wg.Wait() -} - -func (m *monitorT) calcMetadata(ctx context.Context) { - m.agentMetadata = model.AgentMetadata{ - ID: m.fleet.Agent.ID, - Version: m.fleet.Agent.Version, - } - hostname := m.fleet.Host.Name - if hostname == "" { - h, err := os.Hostname() - if err != nil { - zerolog.Ctx(ctx).Error().Str("ctx", "policy leader manager").Err(err).Msg("failed to get hostname") - } - hostname = h - } - ips, err := m.getIPs() - if err != nil { - zerolog.Ctx(ctx).Error().Str("ctx", "policy leader manager").Err(err).Msg("failed to get ip addresses") - } - m.hostMetadata = model.HostMetadata{ - ID: m.fleet.Host.ID, - Name: hostname, - Architecture: runtime.GOOS, - Ip: ips, - } -} - -func (m *monitorT) getIPs() ([]string, error) { - ifaces, err := net.Interfaces() - if err != nil { - return nil, err - } - ips := []string{} - for _, i := range ifaces { - addrs, err := i.Addrs() - if err != nil { - continue - } - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - if ip != nil { - ips = append(ips, ip.String()) - } - } - } - return ips, nil -} - -func runCoordinator(ctx context.Context, cord Coordinator, l zerolog.Logger, d time.Duration) { - cnt := 0 - for { - l.Info().Int("count", cnt).Str("coordinator", cord.Name()).Msg("Starting policy coordinator") - err := cord.Run(ctx) - if !errors.Is(err, context.Canceled) { - l.Err(err).Msg("Policy coordinator failed and stopped") - if errors.Is(sleep.WithContext(ctx, d), context.Canceled) { - break - } - } else { - break - } - cnt += 1 - } -} - -func runCoordinatorOutput(ctx context.Context, cord Coordinator, bulker bulk.Bulk, l zerolog.Logger, policiesIndex string) { - for { - select { - case p := <-cord.Output(): - s := l.With().Int64(dl.FieldRevisionIdx, p.RevisionIdx).Int64(dl.FieldCoordinatorIdx, p.CoordinatorIdx).Logger() - _, err := dl.CreatePolicy(ctx, bulker, p, dl.WithIndexName(policiesIndex)) - if err != nil { - s.Err(err).Msg("Policy coordinator failed to add a new policy revision") - } else { - s.Info().Int64("revision_id", p.RevisionIdx).Msg("Policy coordinator added a new policy revision") - } - case <-ctx.Done(): - return - } - } -} diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go deleted file mode 100644 index 8fa29b8ee..000000000 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package coordinator - -import ( - "context" - "encoding/json" - "errors" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/elastic/fleet-server/v7/internal/pkg/monitor" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" - testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" -) - -func TestMonitorLeadership(t *testing.T) { - parentCtx := context.Background() - parentCtx = testlog.SetLogger(t).WithContext(parentCtx) - bulkCtx, bulkCn := context.WithCancel(parentCtx) - defer bulkCn() - ctx, cn := context.WithCancel(parentCtx) - defer cn() - - // flush bulker on every operation - bulker := ftesting.SetupBulk(bulkCtx, t, bulk.WithFlushThresholdCount(1)) - - serversIndex := ftesting.CleanIndex(ctx, t, bulker, dl.FleetServers) - policiesIndex := ftesting.CleanIndex(ctx, t, bulker, dl.FleetPolicies) - leadersIndex := ftesting.CleanIndex(ctx, t, bulker, dl.FleetPoliciesLeader) - - pim, err := monitor.New(policiesIndex, bulker.Client(), bulker.Client()) - if err != nil { - t.Fatal(err) - } - cfg := makeFleetConfig() - pm := NewMonitor(cfg, "1.0.0", bulker, pim, NewCoordinatorZero) - pm.(*monitorT).serversIndex = serversIndex - pm.(*monitorT).leadersIndex = leadersIndex - pm.(*monitorT).policiesIndex = policiesIndex - - // start with 1 initial policy - policy1Id := uuid.Must(uuid.NewV4()).String() - policy1 := model.Policy{ - PolicyID: policy1Id, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 1, - } - _, err = dl.CreatePolicy(ctx, bulker, policy1, dl.WithIndexName(policiesIndex)) - if err != nil { - t.Fatal(err) - } - - // start the monitors - g, _ := errgroup.WithContext(context.Background()) - g.Go(func() error { - err := pim.Run(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - return err - } - return nil - }) - g.Go(func() error { - err := pm.Run(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - return err - } - return nil - }) - - // wait 500ms to ensure everything is running; then create a new policy - <-time.After(500 * time.Millisecond) - policy2Id := uuid.Must(uuid.NewV4()).String() - policy2 := model.Policy{ - PolicyID: policy2Id, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 1, - } - _, err = dl.CreatePolicy(ctx, bulker, policy2, dl.WithIndexName(policiesIndex)) - if err != nil { - t.Fatal(err) - } - - // wait 2 seconds so the index monitor notices the new policy - <-time.After(2 * time.Second) - ensureServer(ctx, t, bulker, cfg, serversIndex) - ensureLeadership(ctx, t, bulker, cfg, leadersIndex, policy1Id) - ensureLeadership(ctx, t, bulker, cfg, leadersIndex, policy2Id) - ensurePolicy(ctx, t, bulker, policiesIndex, policy1Id, 1, 1) - ensurePolicy(ctx, t, bulker, policiesIndex, policy2Id, 1, 1) - - // stop the monitors - cn() - err = g.Wait() - require.NoError(t, err) - - // ensure leadership was released - ensureLeadershipReleased(bulkCtx, t, bulker, cfg, leadersIndex, policy1Id) - ensureLeadershipReleased(bulkCtx, t, bulker, cfg, leadersIndex, policy2Id) -} - -func makeFleetConfig() config.Fleet { - id := uuid.Must(uuid.NewV4()).String() - return config.Fleet{ - Agent: config.Agent{ - ID: id, - Version: "1.0.0", - }, - Host: config.Host{ - ID: id, - }, - } -} - -func ensureServer(ctx context.Context, t *testing.T, bulker bulk.Bulk, cfg config.Fleet, index string) { - t.Helper() - var srv model.Server - data, err := bulker.Read(ctx, index, cfg.Agent.ID, bulk.WithRefresh()) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &srv) - if err != nil { - t.Fatal(err) - } - if srv.Agent.ID != cfg.Agent.ID { - t.Fatal("agent.id should match from configuration") - } -} - -func ensureLeadership(ctx context.Context, t *testing.T, bulker bulk.Bulk, cfg config.Fleet, index string, policyID string) { - t.Helper() - var leader model.PolicyLeader - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != cfg.Agent.ID { - t.Fatal("server.id should match from configuration") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - if time.Now().UTC().Sub(lt) >= 5*time.Second { - t.Fatal("@timestamp should be with in 5 seconds") - } -} - -func ensurePolicy(ctx context.Context, t *testing.T, bulker bulk.Bulk, index string, policyID string, revisionIdx, coordinatorIdx int64) { - t.Helper() - policies, err := dl.QueryLatestPolicies(ctx, bulker, dl.WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - var found *model.Policy - for i := range policies { - p := policies[i] - if p.PolicyID == policyID { - found = &p - break - } - } - if found == nil { - t.Fatal("policy not found") - } - if found.RevisionIdx != revisionIdx { - t.Fatal("revision_idx does not match") - } - if found.CoordinatorIdx != coordinatorIdx { - t.Fatal("coordinator_idx does not match") - } -} - -func ensureLeadershipReleased(ctx context.Context, t *testing.T, bulker bulk.Bulk, cfg config.Fleet, index string, policyID string) { - t.Helper() - var leader model.PolicyLeader - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != cfg.Agent.ID { - t.Fatal("server.id should match from configuration") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - diff := time.Now().UTC().Sub(lt).Seconds() - if diff < (30 * time.Second).Seconds() { - t.Fatalf("@timestamp different should be more than 30 seconds; instead its %.0f secs", diff) - } -} diff --git a/internal/pkg/coordinator/v0.go b/internal/pkg/coordinator/v0.go deleted file mode 100644 index dcf343168..000000000 --- a/internal/pkg/coordinator/v0.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package coordinator - -import ( - "context" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - - "github.com/rs/zerolog" -) - -// coordinatorZeroT is V0 coordinator that just takes a subscribed policy and outputs the same policy. -type coordinatorZeroT struct { - policy model.Policy - in chan model.Policy - out chan model.Policy -} - -// NewCoordinatorZero creates a V0 coordinator. -func NewCoordinatorZero(policy model.Policy) (Coordinator, error) { - return &coordinatorZeroT{ - policy: policy, - in: make(chan model.Policy), - out: make(chan model.Policy), - }, nil -} - -// Name returns the "v0" name. -func (c *coordinatorZeroT) Name() string { - return "v0" -} - -// Run runs the coordinator for the policy. -func (c *coordinatorZeroT) Run(ctx context.Context) error { - err := c.updatePolicy(c.policy) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to handle policy") - } - - for { - select { - case p := <-c.in: - err = c.updatePolicy(p) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to handle policy") - continue - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -// Update called to signal a new policy revision has been defined. -func (c *coordinatorZeroT) Update(_ context.Context, policy model.Policy) error { - c.in <- policy - return nil -} - -// Output is the output channel for updated coordinated policies. -func (c *coordinatorZeroT) Output() <-chan model.Policy { - return c.out -} - -// updatePolicy performs the working of incrementing the coordinator idx. -func (c *coordinatorZeroT) updatePolicy(p model.Policy) error { - _, err := c.handlePolicy(p.Data) - if err != nil { - return err - } - if p.CoordinatorIdx == 0 { - p.CoordinatorIdx += 1 - c.policy = p - c.out <- p - } - return nil -} - -// handlePolicy performs the actual work of coordination. -// -// Does nothing at the moment. -func (c *coordinatorZeroT) handlePolicy(data *model.PolicyData) (*model.PolicyData, error) { - return data, nil -} diff --git a/internal/pkg/coordinator/v0_test.go b/internal/pkg/coordinator/v0_test.go deleted file mode 100644 index d00ec096e..000000000 --- a/internal/pkg/coordinator/v0_test.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !integration - -package coordinator - -import ( - "context" - "testing" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/gofrs/uuid" -) - -func TestCoordinatorZero(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - - policyId := uuid.Must(uuid.NewV4()).String() - policy := model.Policy{ - PolicyID: policyId, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 1, - } - coord, err := NewCoordinatorZero(policy) - if err != nil { - t.Fatal(err) - } - - go func() { - if err := coord.Run(ctx); err != nil && err != context.Canceled { - t.Error(err) - return - } - }() - - // should get a new policy on start up - select { - case newPolicy := <-coord.Output(): - if newPolicy.RevisionIdx != 1 { - t.Fatalf("revision_idx should be set to 1, it was set to %d", newPolicy.RevisionIdx) - } - if newPolicy.CoordinatorIdx != 1 { - t.Fatalf("coordinator_idx should be set to 1, it was set to %d", newPolicy.CoordinatorIdx) - } - case <-time.After(500 * time.Millisecond): - t.Fatal("never receive a new policy") - } - - // send a new policy revision; should get a new policy - policy = model.Policy{ - PolicyID: policyId, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 2, - } - if err := coord.Update(ctx, policy); err != nil { - t.Fatal(err) - } - select { - case newPolicy := <-coord.Output(): - if newPolicy.RevisionIdx != 2 { - t.Fatalf("revision_idx should be set to 2, it was set to %d", newPolicy.RevisionIdx) - } - if newPolicy.CoordinatorIdx != 1 { - t.Fatalf("coordinator_idx should be set to 1, it was set to %d", newPolicy.CoordinatorIdx) - } - case <-time.After(500 * time.Millisecond): - t.Fatal("never receive a new policy") - } - - // send policy with already set coordinator_idx, v0 does nothing - policy = model.Policy{ - PolicyID: policyId, - CoordinatorIdx: 1, - Data: nil, - RevisionIdx: 2, - } - if err := coord.Update(ctx, policy); err != nil { - t.Fatal(err) - } - select { - case <-coord.Output(): - t.Fatal("should not have got a new policy") - case <-time.After(500 * time.Millisecond): - break - } -} diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 703117bca..185167afe 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -14,8 +14,6 @@ const ( FleetArtifacts = ".fleet-artifacts" FleetEnrollmentAPIKeys = ".fleet-enrollment-api-keys" FleetPolicies = ".fleet-policies" - FleetPoliciesLeader = ".fleet-policies-leader" - FleetServers = ".fleet-servers" ) // Query fields @@ -30,13 +28,11 @@ const ( FieldActionID = "action_id" FieldAgent = "agent" FieldAgentVersion = "version" - FieldCoordinatorIdx = "coordinator_idx" FieldLastCheckin = "last_checkin" FieldLastCheckinStatus = "last_checkin_status" FieldLastCheckinMessage = "last_checkin_message" FieldLocalMetadata = "local_metadata" FieldComponents = "components" - FieldPolicyCoordinatorIdx = "policy_coordinator_idx" FieldPolicyID = "policy_id" FieldPolicyOutputAPIKey = "api_key" FieldPolicyOutputAPIKeyID = "api_key_id" diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 82e20a67b..30f9c3c39 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -193,9 +193,9 @@ func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { // The migration was necessary and indeed run, thus we need to regenerate // the API keys for all agents. In order to do so, we increase the policy - // coordinator index to force a policy update. + // revision index to force a policy update. if migrated > 0 { - _, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) + _, err := migrate(ctx, bulker, migratePolicyRevisionIdx) if err != nil { return fmt.Errorf("v8.5.0 data migration failed: %w", err) } @@ -274,16 +274,17 @@ ctx._source.policy_output_permissions_hash=null; return migrationName, FleetAgents, body, nil } -// migratePolicyCoordinatorIdx increases the policy's CoordinatorIdx to force +// migratePolicyRevisionIdx increases the policy's RevisionIdx to force // a policy update ensuring the output data will be migrated to the new // Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672 // for details. -func migratePolicyCoordinatorIdx() (string, string, []byte, error) { - const migrationName = "PolicyCoordinatorIdx" +// NOTE(michel-laterman): Updated to increment the RevisionIdx instead of the CoordinatorrIdx as the coordinator is being removed. +func migratePolicyRevisionIdx() (string, string, []byte, error) { + const migrationName = "PolicyRevisionIdx" query := dsl.NewRoot() query.Query().MatchAll() - painless := `ctx._source.coordinator_idx++;` + painless := `ctx._source.revision_idx++;` query.Param("script", painless) body, err := query.MarshalJSON() diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go index 3bd5a20a2..96d25ec33 100644 --- a/internal/pkg/dl/migration_integration_test.go +++ b/internal/pkg/dl/migration_integration_test.go @@ -92,7 +92,6 @@ func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, policyModel := model.Policy{ ESDocument: model.ESDocument{}, - CoordinatorIdx: int64(i), Data: &policyData, DefaultFleetServer: false, PolicyID: fmt.Sprint(i), @@ -114,7 +113,7 @@ func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, return created } -func TestPolicyCoordinatorIdx(t *testing.T) { +func TestPolicyRevisionIdx(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() ctx = testlog.SetLogger(t).WithContext(ctx) @@ -123,7 +122,7 @@ func TestPolicyCoordinatorIdx(t *testing.T) { docIDs := createSomePolicies(ctx, t, 25, index, bulker) - migrated, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) + migrated, err := migrate(ctx, bulker, migratePolicyRevisionIdx) require.NoError(t, err) require.Equal(t, len(docIDs), migrated) @@ -143,7 +142,7 @@ func TestPolicyCoordinatorIdx(t *testing.T) { } } - assert.Equal(t, int64(i+1), got.CoordinatorIdx) + assert.Equal(t, int64(2), got.RevisionIdx, "expected migration to increment revision_idx value by one") } } diff --git a/internal/pkg/dl/policies.go b/internal/pkg/dl/policies.go index 258a0e733..29dfc6779 100644 --- a/internal/pkg/dl/policies.go +++ b/internal/pkg/dl/policies.go @@ -32,7 +32,6 @@ func prepareQueryLatestPolicies() []byte { revisionIdx.Size(1) rSort := revisionIdx.Sort() rSort.SortOrder(FieldRevisionIdx, dsl.SortDescend) - rSort.SortOrder(FieldCoordinatorIdx, dsl.SortDescend) return root.MustMarshalJSON() } diff --git a/internal/pkg/dl/policies_integration_test.go b/internal/pkg/dl/policies_integration_test.go index ceb4fa9ee..fecf1b3cf 100644 --- a/internal/pkg/dl/policies_integration_test.go +++ b/internal/pkg/dl/policies_integration_test.go @@ -36,7 +36,6 @@ func createRandomPolicy(id string, revisionIdx int) model.Policy { return model.Policy{ PolicyID: id, RevisionIdx: int64(revisionIdx), - CoordinatorIdx: 0, Data: &policyData, DefaultFleetServer: false, Timestamp: now.Format(time.RFC3339), diff --git a/internal/pkg/dl/policies_leader.go b/internal/pkg/dl/policies_leader.go deleted file mode 100644 index 3fe83602b..000000000 --- a/internal/pkg/dl/policies_leader.go +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package dl - -import ( - "context" - "encoding/json" - "errors" - "sync" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/dsl" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/rs/zerolog" -) - -var ( - tmplSearchPolicyLeaders *dsl.Tmpl - initSearchPolicyLeadersOnce sync.Once -) - -func prepareSearchPolicyLeaders() (*dsl.Tmpl, error) { - tmpl := dsl.NewTmpl() - root := dsl.NewRoot() - root.Query().Terms(FieldID, tmpl.Bind(FieldID), nil) - - err := tmpl.Resolve(root) - if err != nil { - return nil, err - } - return tmpl, nil -} - -// SearchPolicyLeaders returns all the leaders for the provided policies -func SearchPolicyLeaders(ctx context.Context, bulker bulk.Bulk, ids []string, opt ...Option) (leaders map[string]model.PolicyLeader, err error) { - initSearchPolicyLeadersOnce.Do(func() { - tmplSearchPolicyLeaders, err = prepareSearchPolicyLeaders() - if err != nil { - return - } - }) - - o := newOption(FleetPoliciesLeader, opt...) - data, err := tmplSearchPolicyLeaders.RenderOne(FieldID, ids) - if err != nil { - return - } - res, err := bulker.Search(ctx, o.indexName, data) - if err != nil { - if errors.Is(err, es.ErrIndexNotFound) { - zerolog.Ctx(ctx).Debug().Str("index", o.indexName).Msg(es.ErrIndexNotFound.Error()) - err = nil - } - return - } - - leaders = map[string]model.PolicyLeader{} - for _, hit := range res.Hits { - var l model.PolicyLeader - err = hit.Unmarshal(&l) - if err != nil { - return - } - leaders[hit.ID] = l - } - return leaders, nil -} - -// TakePolicyLeadership tries to take leadership of a policy -func TakePolicyLeadership(ctx context.Context, bulker bulk.Bulk, policyID, serverID, version string, opt ...Option) error { - o := newOption(FleetPoliciesLeader, opt...) - data, err := bulker.Read(ctx, o.indexName, policyID, bulk.WithRefresh()) - if err != nil && !errors.Is(err, es.ErrElasticNotFound) { - return err - } - var l model.PolicyLeader - found := false - if !errors.Is(err, es.ErrElasticNotFound) { - found = true - err = json.Unmarshal(data, &l) - if err != nil { - return err - } - } - if l.Server == nil { - l.Server = &model.ServerMetadata{} - } - l.Server.ID = serverID - l.Server.Version = version - l.SetTime(time.Now().UTC()) - if found { - data, err = json.Marshal(&struct { - Doc model.PolicyLeader `json:"doc"` - }{ - Doc: l, - }) - if err != nil { - return err - } - err = bulker.Update(ctx, o.indexName, policyID, data, bulk.WithRefresh()) - } else { - data, err = json.Marshal(&l) - if err != nil { - return err - } - _, err = bulker.Create(ctx, o.indexName, policyID, data, bulk.WithRefresh()) - } - if err != nil { - return err - } - return nil -} - -// ReleasePolicyLeadership releases leadership of a policy -func ReleasePolicyLeadership(ctx context.Context, bulker bulk.Bulk, policyID, serverID string, releaseInterval time.Duration, opt ...Option) error { - o := newOption(FleetPoliciesLeader, opt...) - data, err := bulker.Read(ctx, o.indexName, policyID, bulk.WithRefresh()) - if errors.Is(err, es.ErrElasticNotFound) { - // nothing to do - return nil - } - if err != nil { - return err - } - var l model.PolicyLeader - err = json.Unmarshal(data, &l) - if err != nil { - return err - } - if l.Server.ID != serverID { - // not leader anymore; nothing to do - return nil - } - released := time.Now().UTC().Add(-releaseInterval) - l.SetTime(released) - data, err = json.Marshal(&struct { - Doc model.PolicyLeader `json:"doc"` - }{ - Doc: l, - }) - if err != nil { - return err - } - err = bulker.Update(ctx, o.indexName, policyID, data, bulk.WithRefresh()) - if errors.Is(err, es.ErrElasticVersionConflict) { - // another leader took over; nothing to worry about - return nil - } - return err -} diff --git a/internal/pkg/dl/policies_leader_integration_test.go b/internal/pkg/dl/policies_leader_integration_test.go deleted file mode 100644 index 73194b6c3..000000000 --- a/internal/pkg/dl/policies_leader_integration_test.go +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package dl - -import ( - "context" - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" - testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - - "github.com/gofrs/uuid" -) - -const testVer = "1.0.0" - -func TestSearchPolicyLeaders(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - // insert a policy leaders to search for - serverID := uuid.Must(uuid.NewV4()).String() - policyIds := make([]string, 3) - for i := 0; i < 3; i++ { - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - policyIds[i] = policyID - } - - // possible that a search will not produce 3 directly after write - // so we try 3 times to ensure - ftesting.Retry(t, ctx, func(ctx context.Context) error { - leaders, err := SearchPolicyLeaders(ctx, bulker, policyIds, WithIndexName(index)) - if err != nil { - return err - } - if len(leaders) != 3 { - return fmt.Errorf("must have found 3 leaders: only found %v", len(leaders)) - } - return nil - }, ftesting.RetryCount(3)) -} - -func TestTakePolicyLeadership(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - serverID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - var leader model.PolicyLeader - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != serverID || leader.Server.Version != version { - t.Fatal("server.id and server.version should match") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - if time.Now().UTC().Sub(lt) >= 5*time.Second { - t.Fatal("@timestamp should be with in 5 seconds") - } -} - -func TestReleasePolicyLeadership(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - serverID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - err = ReleasePolicyLeadership(ctx, bulker, policyID, serverID, 30*time.Second, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - var leader model.PolicyLeader - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != serverID || leader.Server.Version != version { - t.Fatal("server.id and server.version should match") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - diff := time.Now().UTC().Sub(lt).Seconds() - if diff < (30 * time.Second).Seconds() { - t.Fatalf("@timestamp different should be more than 30 seconds; instead its %.0f secs", diff) - } -} - -func TestReleasePolicyLeadership_NothingIfNotLeader(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - serverID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - otherServerID := uuid.Must(uuid.NewV4()).String() - err = ReleasePolicyLeadership(ctx, bulker, policyID, otherServerID, 30*time.Second, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - var leader model.PolicyLeader - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != serverID || leader.Server.Version != version { - t.Fatal("server.id and server.version should match") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - diff := time.Now().UTC().Sub(lt).Seconds() - if diff >= (5 * time.Second).Seconds() { - t.Fatalf("@timestamp different should less than 5 seconds; instead its %.0f secs", diff) - } -} diff --git a/internal/pkg/dl/servers.go b/internal/pkg/dl/servers.go deleted file mode 100644 index 2686bf560..000000000 --- a/internal/pkg/dl/servers.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package dl - -import ( - "context" - "encoding/json" - "errors" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/model" -) - -// EnsureServer ensures that this server is written in the index. -func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent model.AgentMetadata, host model.HostMetadata, opts ...Option) error { - var server model.Server - o := newOption(FleetServers, opts...) - data, err := bulker.Read(ctx, o.indexName, agent.ID) - if err != nil && !errors.Is(err, es.ErrElasticNotFound) { - return err - } - if errors.Is(err, es.ErrElasticNotFound) { - server.Agent = &agent - server.Host = &host - server.Server = &model.ServerMetadata{ - ID: agent.ID, - Version: version, - } - server.SetTime(time.Now().UTC()) - data, err = json.Marshal(&server) - if err != nil { - return err - } - _, err = bulker.Create(ctx, o.indexName, agent.ID, data) - return err - } - err = json.Unmarshal(data, &server) - if err != nil { - return err - } - server.Agent = &agent - server.Host = &host - server.Server = &model.ServerMetadata{ - ID: agent.ID, - Version: version, - } - server.SetTime(time.Now().UTC()) - data, err = json.Marshal(&struct { - Doc model.Server `json:"doc"` - }{server}) - if err != nil { - return err - } - return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) -} diff --git a/internal/pkg/dl/servers_integration_test.go b/internal/pkg/dl/servers_integration_test.go deleted file mode 100644 index da2523480..000000000 --- a/internal/pkg/dl/servers_integration_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package dl - -import ( - "context" - "encoding/json" - "runtime" - "testing" - - "github.com/gofrs/uuid" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" - testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" -) - -func TestEnsureServer(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetServers) - - agentID := uuid.Must(uuid.NewV4()).String() - agent := model.AgentMetadata{ - ID: agentID, - Version: "1.0.0", - } - host := model.HostMetadata{ - Architecture: runtime.GOOS, - ID: agentID, - Ip: []string{"::1"}, - Name: "testing-host", - } - - err := EnsureServer(ctx, bulker, "1.0.0", agent, host, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - var srv model.Server - data, err := bulker.Read(ctx, index, agentID) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &srv) - if err != nil { - t.Fatal(err) - } - if srv.Agent.ID != agentID { - t.Fatal("agent.id should match agentId") - } -} diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index ecb71f4aa..fa5444d0a 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -347,7 +347,7 @@ type Policy struct { ESDocument // The coordinator index of the policy - CoordinatorIdx int64 `json:"coordinator_idx"` + CoordinatorIdx int64 `json:"coordinator_idx,omitempty"` Data *PolicyData `json:"data"` // True when this policy is the default policy to start Fleet Server diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 187b8ab67..b89f19230 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -56,7 +56,7 @@ type Monitor interface { Run(ctx context.Context) error // Subscribe creates a new subscription for a policy update. - Subscribe(agentID string, policyID string, revisionIdx int64, coordinatorIdx int64) (Subscription, error) + Subscribe(agentID string, policyID string, revisionIdx int64) (Subscription, error) // Unsubscribe removes the current subscription. Unsubscribe(sub Subscription) error @@ -230,7 +230,6 @@ func (m *monitorT) dispatchPending() bool { Str(logger.AgentID, s.agentID). Str(logger.PolicyID, s.policyID). Int64("rev", s.revIdx). - Int64("coord", s.coordIdx). Msg("dispatch") default: // Should never block on a channel; we created a channel of size one. @@ -294,9 +293,6 @@ func groupByLatest(policies []model.Policy) map[string]model.Policy { } if policy.RevisionIdx > curr.RevisionIdx { latest[policy.PolicyID] = policy - continue - } else if policy.RevisionIdx == curr.RevisionIdx && policy.CoordinatorIdx > curr.CoordinatorIdx { - latest[policy.PolicyID] = policy } } return latest @@ -312,14 +308,8 @@ func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool { zlog := m.log.With(). Str(logger.PolicyID, newPolicy.PolicyID). Int64("rev", newPolicy.RevisionIdx). - Int64("coord", newPolicy.CoordinatorIdx). Logger() - if newPolicy.CoordinatorIdx <= 0 { - zlog.Info().Str(logger.PolicyID, newPolicy.PolicyID).Msg("Ignore policy that has not passed through coordinator") - return false - } - m.mut.Lock() defer m.mut.Unlock() @@ -371,7 +361,6 @@ func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool { zlog.Info(). Int64("oldRev", oldPolicy.RevisionIdx). - Int64("oldCoord", oldPolicy.CoordinatorIdx). Int("nQueued", nQueued). Str(logger.PolicyID, newPolicy.PolicyID). Msg("New revision of policy received and added to the queue") @@ -380,7 +369,6 @@ func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool { } func (m *monitorT) kickLoad() { - select { case m.kickCh <- struct{}{}: default: @@ -389,7 +377,6 @@ func (m *monitorT) kickLoad() { } func (m *monitorT) kickDeploy() { - select { case m.deployCh <- struct{}{}: default: @@ -397,26 +384,20 @@ func (m *monitorT) kickDeploy() { } // Subscribe creates a new subscription for a policy update. -func (m *monitorT) Subscribe(agentID string, policyID string, revisionIdx int64, coordinatorIdx int64) (Subscription, error) { +func (m *monitorT) Subscribe(agentID string, policyID string, revisionIdx int64) (Subscription, error) { if revisionIdx < 0 { return nil, errors.New("revisionIdx must be greater than or equal to 0") } - if coordinatorIdx < 0 { - return nil, errors.New("coordinatorIdx must be greater than or equal to 0") - } - m.log.Debug(). Str(logger.AgentID, agentID). Str(logger.PolicyID, policyID). Int64("rev", revisionIdx). - Int64("coord", coordinatorIdx). Msg("subscribed to policy monitor") s := NewSub( policyID, agentID, revisionIdx, - coordinatorIdx, ) m.mut.Lock() @@ -464,7 +445,6 @@ func (m *monitorT) Unsubscribe(sub Subscription) error { Str(logger.AgentID, s.agentID). Str(logger.PolicyID, s.policyID). Int64("rev", s.revIdx). - Int64("coord", s.coordIdx). Msg("unsubscribe") return nil diff --git a/internal/pkg/policy/monitor_integration_test.go b/internal/pkg/policy/monitor_integration_test.go index 380dc5b1d..801b62928 100644 --- a/internal/pkg/policy/monitor_integration_test.go +++ b/internal/pkg/policy/monitor_integration_test.go @@ -77,17 +77,16 @@ func TestMonitor_Integration(t *testing.T) { agentID := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() - s, err := m.Subscribe(agentID, policyID, 0, 0) + s, err := m.Subscribe(agentID, policyID, 0) defer m.Unsubscribe(s) //nolint:errcheck // defered function if err != nil { t.Fatal(err) } policy := model.Policy{ - PolicyID: policyID, - CoordinatorIdx: 1, - Data: &intPolData, - RevisionIdx: 1, + PolicyID: policyID, + Data: &intPolData, + RevisionIdx: 1, } ch := make(chan error, 1) go func() { @@ -102,7 +101,7 @@ func TestMonitor_Integration(t *testing.T) { select { case subPolicy := <-s.Output(): tm.Stop() - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 { t.Fatal("failed to get the expected updated policy") } case <-tm.C: diff --git a/internal/pkg/policy/monitor_test.go b/internal/pkg/policy/monitor_test.go index db3b68a7e..a34c77791 100644 --- a/internal/pkg/policy/monitor_test.go +++ b/internal/pkg/policy/monitor_test.go @@ -69,7 +69,7 @@ func TestMonitor_NewPolicy(t *testing.T) { agentId := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() - s, err := monitor.Subscribe(agentId, policyID, 0, 0) + s, err := monitor.Subscribe(agentId, policyID, 0) defer monitor.Unsubscribe(s) require.NoError(t, err) @@ -80,10 +80,9 @@ func TestMonitor_NewPolicy(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyID, - CoordinatorIdx: 1, - Data: policyDataDefault, - RevisionIdx: 1, + PolicyID: policyID, + Data: policyDataDefault, + RevisionIdx: 1, } policyData, err := json.Marshal(&policy) require.NoError(t, err) @@ -149,7 +148,7 @@ func TestMonitor_SamePolicy(t *testing.T) { agentId := uuid.Must(uuid.NewV4()).String() policyId := uuid.Must(uuid.NewV4()).String() - s, err := monitor.Subscribe(agentId, policyId, 1, 1) + s, err := monitor.Subscribe(agentId, policyId, 1) defer monitor.Unsubscribe(s) require.NoError(t, err) @@ -160,10 +159,9 @@ func TestMonitor_SamePolicy(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, - CoordinatorIdx: 1, - Data: policyDataDefault, - RevisionIdx: 1, + PolicyID: policyId, + Data: policyDataDefault, + RevisionIdx: 1, } policyData, err := json.Marshal(&policy) require.NoError(t, err) @@ -189,85 +187,7 @@ func TestMonitor_SamePolicy(t *testing.T) { if merr != nil && merr != context.Canceled { t.Fatal(merr) } - require.False(t, gotPolicy, "got policy update when it was the same rev/coord idx") - ms.AssertExpectations(t) - mm.AssertExpectations(t) -} - -func TestMonitor_NewPolicyUncoordinated(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx = testlog.SetLogger(t).WithContext(ctx) - - chHitT := make(chan []es.HitT, 1) - defer close(chHitT) - ms := mmock.NewMockSubscription() - ms.On("Output").Return((<-chan []es.HitT)(chHitT)) - mm := mmock.NewMockMonitor() - mm.On("Subscribe").Return(ms).Once() - mm.On("Unsubscribe", mock.Anything).Return().Once() - bulker := ftesting.NewMockBulk() - - monitor := NewMonitor(bulker, mm, 0) - pm := monitor.(*monitorT) - pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) { - return []model.Policy{}, nil - } - - var merr error - var mwg sync.WaitGroup - mwg.Add(1) - go func() { - defer mwg.Done() - merr = monitor.Run(ctx) - }() - - err := monitor.(*monitorT).waitStart(ctx) - require.NoError(t, err) - - agentId := uuid.Must(uuid.NewV4()).String() - policyId := uuid.Must(uuid.NewV4()).String() - s, err := monitor.Subscribe(agentId, policyId, 1, 1) - defer monitor.Unsubscribe(s) - require.NoError(t, err) - - rId := xid.New().String() - policy := model.Policy{ - ESDocument: model.ESDocument{ - Id: rId, - Version: 1, - SeqNo: 1, - }, - PolicyID: policyId, - CoordinatorIdx: 0, - Data: policyDataDefault, - RevisionIdx: 2, - } - policyData, err := json.Marshal(&policy) - require.NoError(t, err) - - chHitT <- []es.HitT{{ - ID: rId, - SeqNo: 1, - Version: 1, - Source: policyData, - }} - - gotPolicy := false - tm := time.NewTimer(1 * time.Second) - defer tm.Stop() - select { - case <-s.Output(): - gotPolicy = true - case <-tm.C: - } - - cancel() - mwg.Wait() - if merr != nil && merr != context.Canceled { - t.Fatal(merr) - } - require.False(t, gotPolicy, "got policy update when it had coordinator_idx set to 0") + require.False(t, gotPolicy, "got policy update when it was the same rev idx") ms.AssertExpectations(t) mm.AssertExpectations(t) } @@ -318,10 +238,9 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, - CoordinatorIdx: 1, - Data: policyDataDefault, - RevisionIdx: 2, + PolicyID: policyId, + Data: policyDataDefault, + RevisionIdx: 2, } pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) { @@ -340,7 +259,7 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { err := monitor.(*monitorT).waitStart(ctx) require.NoError(t, err) - s, err := monitor.Subscribe(agentId, policyId, 1, 1) + s, err := monitor.Subscribe(agentId, policyId, 1) defer monitor.Unsubscribe(s) require.NoError(t, err) diff --git a/internal/pkg/policy/revision.go b/internal/pkg/policy/revision.go index 68d6768a7..dc3d29eb0 100644 --- a/internal/pkg/policy/revision.go +++ b/internal/pkg/policy/revision.go @@ -14,24 +14,25 @@ import ( // Revision is a policy revision that is sent as an action ID to an agent. type Revision struct { - PolicyID string - RevisionIdx int64 - CoordinatorIdx int64 + PolicyID string + RevisionIdx int64 } // RevisionFromPolicy creates the revision from the policy. func RevisionFromPolicy(policy model.Policy) Revision { return Revision{ - PolicyID: policy.PolicyID, - RevisionIdx: policy.RevisionIdx, - CoordinatorIdx: policy.CoordinatorIdx, + PolicyID: policy.PolicyID, + RevisionIdx: policy.RevisionIdx, } } // RevisionFromString converts the string to a policy revision. func RevisionFromString(actionID string) (Revision, bool) { split := strings.Split(actionID, ":") - if len(split) != 4 { + // NOTE: len 3 is expected for any policy change generated by fleet server v8.13+ + // len 4 includes the previously used coordinator_idx value that has been deprecated. + // If we receive a actionID with the coordinator_idx ignore the coordinator_idx value. + if len(split) < 3 || len(split) > 4 { return Revision{}, false } if split[0] != "policy" { @@ -41,18 +42,13 @@ func RevisionFromString(actionID string) (Revision, bool) { if err != nil { return Revision{}, false } - coordIdx, err := strconv.ParseInt(split[3], 10, 64) - if err != nil { - return Revision{}, false - } return Revision{ - PolicyID: split[1], - RevisionIdx: revIdx, - CoordinatorIdx: coordIdx, + PolicyID: split[1], + RevisionIdx: revIdx, }, true } // String returns the ID string for the policy revision. func (a *Revision) String() string { - return fmt.Sprintf("policy:%s:%d:%d", a.PolicyID, a.RevisionIdx, a.CoordinatorIdx) + return fmt.Sprintf("policy:%s:%d", a.PolicyID, a.RevisionIdx) } diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index dd1f4a865..c59714487 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -90,7 +90,6 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, @@ -131,7 +130,6 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 2, DefaultFleetServer: true, @@ -247,7 +245,6 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, @@ -387,7 +384,6 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 2, DefaultFleetServer: true, @@ -428,7 +424,6 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { SeqNo: 2, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, @@ -543,7 +538,6 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, diff --git a/internal/pkg/policy/sub.go b/internal/pkg/policy/sub.go index 86c60a5ce..e767c35f2 100644 --- a/internal/pkg/policy/sub.go +++ b/internal/pkg/policy/sub.go @@ -12,7 +12,6 @@ type subT struct { policyID string agentID string // not logically necessary; cached for logging revIdx int64 - coordIdx int64 next *subT prev *subT @@ -20,12 +19,11 @@ type subT struct { ch chan *ParsedPolicy } -func NewSub(policyID, agentID string, revIdx, coordIdx int64) *subT { +func NewSub(policyID, agentID string, revIdx int64) *subT { return &subT{ policyID: policyID, agentID: agentID, revIdx: revIdx, - coordIdx: coordIdx, ch: make(chan *ParsedPolicy, 1), } } @@ -77,11 +75,9 @@ func (n *subT) isEmpty() bool { } func (n *subT) isUpdate(policy *model.Policy) bool { - pRevIdx := policy.RevisionIdx - pCoordIdx := policy.CoordinatorIdx - return (pRevIdx > n.revIdx && pCoordIdx > 0) || (pRevIdx == n.revIdx && pCoordIdx > n.coordIdx) + return pRevIdx > n.revIdx } // Output returns a new policy that needs to be sent based on the current subscription. diff --git a/internal/pkg/policy/sub_test.go b/internal/pkg/policy/sub_test.go index d756df964..40e70ba13 100644 --- a/internal/pkg/policy/sub_test.go +++ b/internal/pkg/policy/sub_test.go @@ -35,7 +35,7 @@ func TestSub_PushBackN(t *testing.T) { nodes := make([]*subT, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) head.pushBack(nn) nodes = append(nodes, nn) } @@ -82,7 +82,7 @@ func TestSub_PushFrontN(t *testing.T) { nodes := make([]*subT, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) head.pushFront(nn) nodes = append(nodes, nn) } @@ -126,7 +126,7 @@ func TestSub_PushRandom(t *testing.T) { nodes := make([]*subT, 0, N) for i := 0; i < N; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) if rand.Intn(2) == 1 { head.pushBack(nn) @@ -162,7 +162,7 @@ func TestSub_UnlinkRandomN(t *testing.T) { nodes := make([]*subT, 0, N) for i := 0; i < N; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) head.pushBack(nn) nodes = append(nodes, nn) } @@ -195,7 +195,7 @@ func TestSub_UnlinkRandomN(t *testing.T) { func BenchmarkSubsSimple(b *testing.B) { head := makeHead() - nn := NewSub("", "", 0, 0) + nn := NewSub("", "", 0) for i := 0; i < b.N; i++ { head.pushBack(nn) head.popFront() @@ -219,7 +219,7 @@ func BenchmarkSubs(b *testing.B) { for i := 0; i < max; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) subs = append(subs, nn) } diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 95b08b9b7..066f7577b 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/coordinator" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/gc" @@ -464,7 +463,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro return err } - // Coordinator policy monitor + // Policy index monitor pim, err := monitor.New(dl.FleetPolicies, esCli, monCli, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), @@ -473,10 +472,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro if err != nil { return err } - g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run)) - cord := coordinator.NewMonitor(cfg.Fleet, f.bi.Version, bulker, pim, coordinator.NewCoordinatorZero) - g.Go(loggedRunFunc(ctx, "Coordinator policy monitor", cord.Run)) // Policy monitor pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits.PolicyThrottle) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index b43979e8a..255717bda 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -547,7 +547,7 @@ func TestServerInstrumentation(t *testing.T) { } } - // Force a transaction (fleet-server should be sending tranactions from the coordinator and monitor) + // Force a transaction (fleet-server should be sending tranactions from monitor) callCheckinFunc() // Verify the APM tracer does not connect to the mocked APM Server. diff --git a/model/schema.json b/model/schema.json index 9857da512..41865c21e 100644 --- a/model/schema.json +++ b/model/schema.json @@ -332,6 +332,7 @@ "type": "integer" }, "coordinator_idx": { + "deprecated": true, "description": "The coordinator index of the policy", "type": "integer" }, @@ -350,7 +351,6 @@ "required": [ "policy_id", "revision_idx", - "coordinator_idx", "data", "default_fleet_server" ] @@ -524,6 +524,7 @@ "type": "integer" }, "policy_coordinator_idx": { + "deprecated": true, "description": "The current policy coordinator for the Elastic Agent", "type": "integer" }, From 521f9e785f51b30fbdac1cdfc8fda4679c428124 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 13 Dec 2023 17:43:59 -0600 Subject: [PATCH 2/8] Fix linter --- internal/pkg/api/handleAck.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index b9355b62a..c985dff68 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -17,6 +17,8 @@ import ( "time" "github.com/rs/zerolog" + "go.elastic.co/apm/module/apmhttp/v2" + "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" @@ -26,8 +28,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" "github.com/elastic/fleet-server/v7/internal/pkg/smap" - "go.elastic.co/apm/module/apmhttp/v2" - "go.elastic.co/apm/v2" ) const ( From 43078e0e327584d6829a7e82c794eb17a1c93a34 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 13 Dec 2023 17:55:22 -0600 Subject: [PATCH 3/8] Update linter --- .github/workflows/golangci-lint.yml | 2 +- Makefile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 2f34a327f..65841aae2 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -38,7 +38,7 @@ jobs: uses: golangci/golangci-lint-action@v2 with: # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.51.2 + version: v1.55.2 # Give the job more time to execute. # Regarding `--whole-files`, the linter is supposed to support linting of changed a patch only but, diff --git a/Makefile b/Makefile index 0db7f090a..8e1c0ffc0 100644 --- a/Makefile +++ b/Makefile @@ -119,7 +119,7 @@ check-headers: ## - Check copyright headers .PHONY: check-go check-go: ## - Run golangci-lint - @curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/d58dbde584c801091e74a00940e11ff18c6c68bd/install.sh | sh -s v1.51.1 + @curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/d58dbde584c801091e74a00940e11ff18c6c68bd/install.sh | sh -s v1.55.2 @./bin/golangci-lint run -v .PHONY: notice From 1ecd3367130d2dac92ac52f15069767ba95adde6 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Tue, 19 Dec 2023 11:44:10 -0600 Subject: [PATCH 4/8] Add issue to changelog --- changelog/fragments/1701298362-Remove-policy-coordinator.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/fragments/1701298362-Remove-policy-coordinator.yaml b/changelog/fragments/1701298362-Remove-policy-coordinator.yaml index dba733837..dc8a1356c 100644 --- a/changelog/fragments/1701298362-Remove-policy-coordinator.yaml +++ b/changelog/fragments/1701298362-Remove-policy-coordinator.yaml @@ -29,4 +29,4 @@ pr: 3131 # Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). # If not present is automatically filled by the tooling with the issue linked to the PR number. -#issue: https://github.com/owner/repo/1234 +issue: 1704 From 76a2334d27677e3e0baa5e38958c570944836b3d Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 17 Apr 2024 09:52:10 -0700 Subject: [PATCH 5/8] Change how output keys are regenerated for v8.5 migration The v8.5 migration generated new output keys for an agent by forcing the policy outputs to be prepared by incrementing the coordinator_idx for the policy. Behaviour was changed instead to detect if a single output has no api_key value (empty string) and subscribe with a revision of 0. --- internal/pkg/api/handleCheckin.go | 12 +- internal/pkg/dl/migration.go | 41 +---- internal/pkg/dl/migration_integration_test.go | 33 ---- .../pkg/policy/monitor_integration_test.go | 48 +++--- internal/pkg/server/fleet_integration_test.go | 158 +++++++++++++++++- .../remote_es_output_integration_test.go | 6 +- 6 files changed, 194 insertions(+), 104 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 9daebd985..a63b51532 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -275,8 +275,18 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r defer ct.ad.Unsubscribe(aSub) actCh := aSub.Ch() + // use revision_idx=0 if the agent has a single output where no API key is defined + // This will force the policy monitor to emit a new policy to regerate API keys + revID := agent.PolicyRevisionIdx + for _, output := range agent.Outputs { + if output.APIKey == "" { + revID = 0 + break + } + } + // Subscribe to policy manager for changes on PolicyId > policyRev - sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx) + sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID) if err != nil { return fmt.Errorf("subscribe policy monitor: %w", err) } diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 30f9c3c39..493069148 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -186,21 +186,15 @@ func migrateAgentMetadata() (string, string, []byte, error) { func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { zerolog.Ctx(ctx).Debug().Msg("applying migration to v8.5.0") - migrated, err := migrate(ctx, bulker, migrateAgentOutputs) + _, err := migrate(ctx, bulker, migrateAgentOutputs) if err != nil { return fmt.Errorf("v8.5.0 data migration failed: %w", err) } - - // The migration was necessary and indeed run, thus we need to regenerate - // the API keys for all agents. In order to do so, we increase the policy - // revision index to force a policy update. - if migrated > 0 { - _, err := migrate(ctx, bulker, migratePolicyRevisionIdx) - if err != nil { - return fmt.Errorf("v8.5.0 data migration failed: %w", err) - } - } - + // NOTE(michel-laterman): We use lazy output migration now that the + // coordinator is removed. Migration completes once an agent checks in + // and an API key associated with an output is empty. The agent will + // subscribe to the policy monitor with revision_idx=0 to force a policy + // change to occur. return nil } @@ -273,26 +267,3 @@ ctx._source.policy_output_permissions_hash=null; return migrationName, FleetAgents, body, nil } - -// migratePolicyRevisionIdx increases the policy's RevisionIdx to force -// a policy update ensuring the output data will be migrated to the new -// Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672 -// for details. -// NOTE(michel-laterman): Updated to increment the RevisionIdx instead of the CoordinatorrIdx as the coordinator is being removed. -func migratePolicyRevisionIdx() (string, string, []byte, error) { - const migrationName = "PolicyRevisionIdx" - - query := dsl.NewRoot() - query.Query().MatchAll() - painless := `ctx._source.revision_idx++;` - query.Param("script", painless) - - body, err := query.MarshalJSON() - if err != nil { - zerolog.Ctx(context.TODO()).Debug().Str("painlessScript", painless). - Msgf("%s: failed painless script", migrationName) - return migrationName, FleetPolicies, nil, fmt.Errorf("could not marshal ES query: %w", err) - } - - return migrationName, FleetPolicies, body, nil -} diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go index fd0629b2a..78a470f8e 100644 --- a/internal/pkg/dl/migration_integration_test.go +++ b/internal/pkg/dl/migration_integration_test.go @@ -114,39 +114,6 @@ func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, return created } -func TestPolicyRevisionIdx(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPolicies) - - docIDs := createSomePolicies(ctx, t, 25, index, bulker) - - migrated, err := migrate(ctx, bulker, migratePolicyRevisionIdx) - require.NoError(t, err) - - require.Equal(t, len(docIDs), migrated) - - for i := range docIDs { - policies, err := QueryLatestPolicies( - ctx, bulker, WithIndexName(index)) - if err != nil { - assert.NoError(t, err, "failed to query latest policies") // we want to continue even if a single agent fails - continue - } - - var got model.Policy - for _, p := range policies { - if p.PolicyID == fmt.Sprint(i) { - got = p - } - } - - assert.Equal(t, int64(2), got.RevisionIdx, "expected migration to increment revision_idx value by one") - } -} - func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() diff --git a/internal/pkg/policy/monitor_integration_test.go b/internal/pkg/policy/monitor_integration_test.go index 55f09e053..2ce96544c 100644 --- a/internal/pkg/policy/monitor_integration_test.go +++ b/internal/pkg/policy/monitor_integration_test.go @@ -175,17 +175,16 @@ func TestMonitor_Debounce_Integration(t *testing.T) { agentID := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() - s, err := m.Subscribe(agentID, policyID, 0, 0) + s, err := m.Subscribe(agentID, policyID, 0) if err != nil { t.Fatal(err) } defer m.Unsubscribe(s) //nolint:errcheck // defered function policy := model.Policy{ - PolicyID: policyID, - CoordinatorIdx: 1, - Data: &intPolData, - RevisionIdx: 1, + PolicyID: policyID, + Data: &intPolData, + RevisionIdx: 1, } ch := make(chan error, 1) go func() { @@ -245,7 +244,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) { ts = time.Now() tm.Stop() t.Log("received initial policy from subsciption") - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 { t.Fatal("failed to get the expected updated policy") } case <-tm.C: @@ -257,7 +256,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) { } // Make new subscription to replicate agent checking in again. - s2, err := m.Subscribe(agentID, policyID, 1, 1) + s2, err := m.Subscribe(agentID, policyID, 1) if err != nil { t.Fatal(err) } @@ -274,7 +273,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) { t.Fatalf("Expected subscription to take at least 1s to update, time was: %s", dur) } // 2nd version of policy should be skipped, 3rd should be read. - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 { t.Fatal("failed to get the expected updated policy") } case <-tm.C: @@ -282,7 +281,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) { } - s3, err := m.Subscribe(agentID, policyID, 3, 1) + s3, err := m.Subscribe(agentID, policyID, 3) if err != nil { t.Fatal(err) } @@ -294,7 +293,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) { tm.Stop() t.Logf("received third policy from subsciption, rev %d", subPolicy.Policy.RevisionIdx) // 2nd version of policy should be skipped, 3rd should be read. - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 4 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 4 { t.Fatal("failed to get the expected updated policy") } case <-tm.C: @@ -369,10 +368,9 @@ func TestMonitor_Revisions(t *testing.T) { policyID := uuid.Must(uuid.NewV4()).String() policy := model.Policy{ - PolicyID: policyID, - CoordinatorIdx: 1, - Data: &intPolData, - RevisionIdx: 1, + PolicyID: policyID, + Data: &intPolData, + RevisionIdx: 1, } _, err = dl.CreatePolicy(ctx, bulker, policy, dl.WithIndexName(index)) if err != nil { @@ -385,14 +383,14 @@ func TestMonitor_Revisions(t *testing.T) { t.Fatal(err) } - s, err := m.Subscribe(agentID, policyID, 1, 1) + s, err := m.Subscribe(agentID, policyID, 1) if err != nil { t.Fatal(err) } defer m.Unsubscribe(s) //nolint:errcheck // defered function agent2 := uuid.Must(uuid.NewV4()).String() - s2, err := m.Subscribe(agent2, policyID, 1, 1) + s2, err := m.Subscribe(agent2, policyID, 1) if err != nil { t.Fatal(err) } @@ -406,7 +404,6 @@ func TestMonitor_Revisions(t *testing.T) { // policy should be ignored as coordinator_idx is 0 policy.RevisionIdx = 4 - policy.CoordinatorIdx = 0 _, err = dl.CreatePolicy(ctx, bulker, policy, dl.WithIndexName(index)) if err != nil { t.Fatal(err) @@ -416,7 +413,7 @@ func TestMonitor_Revisions(t *testing.T) { select { case subPolicy := <-s.Output(): tm.Stop() - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 { t.Fatalf("failed to get the expected updated policy, policy revision: %d", subPolicy.Policy.RevisionIdx) } case <-tm.C: @@ -427,7 +424,7 @@ func TestMonitor_Revisions(t *testing.T) { select { case subPolicy := <-s2.Output(): tm.Stop() - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 { t.Fatalf("failed to get the expected updated policy, policy revision: %d", subPolicy.Policy.RevisionIdx) } case <-tm.C: @@ -493,10 +490,9 @@ func TestMonitor_KickDeploy(t *testing.T) { policyID := uuid.Must(uuid.NewV4()).String() policy := model.Policy{ - PolicyID: policyID, - CoordinatorIdx: 1, - Data: &intPolData, - RevisionIdx: 1, + PolicyID: policyID, + Data: &intPolData, + RevisionIdx: 1, } _, err = dl.CreatePolicy(ctx, bulker, policy, dl.WithIndexName(index)) if err != nil { @@ -509,14 +505,14 @@ func TestMonitor_KickDeploy(t *testing.T) { t.Fatal(err) } - s, err := m.Subscribe(agentID, policyID, 1, 1) + s, err := m.Subscribe(agentID, policyID, 1) if err != nil { t.Fatal(err) } defer m.Unsubscribe(s) //nolint:errcheck // defered function // Force a new policy load so that the kickLoad() func runs - s2, err := m.Subscribe("test", "test", 1, 1) + s2, err := m.Subscribe("test", "test", 1) if err != nil { t.Fatal(err) } @@ -526,7 +522,7 @@ func TestMonitor_KickDeploy(t *testing.T) { select { case subPolicy := <-s.Output(): tm.Stop() - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 2 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 2 { t.Fatalf("failed to get the expected updated policy, policy revision: %d", subPolicy.Policy.RevisionIdx) } case <-tm.C: diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index 151423228..42158a26b 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -72,6 +72,7 @@ type tserver struct { g *errgroup.Group srv *Fleet enrollKey string + policyID string bulker bulk.Bulk outputReloadSuccess atomic.Int32 @@ -219,7 +220,7 @@ func startTestServer(t *testing.T, ctx context.Context, policyD model.PolicyData // Since we start the server in agent mode we need a way to detect if the policy monitor has reloaded the output // NOTE: This code is brittle as it depends on a log string message match - tsrv := &tserver{cfg: cfg, g: g, srv: srv, enrollKey: key.Token(), bulker: bulker} + tsrv := &tserver{cfg: cfg, g: g, srv: srv, enrollKey: key.Token(), policyID: policyID, bulker: bulker} ctx = testlog.SetLogger(t).Hook(zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { if level == zerolog.InfoLevel && message == "Using output from policy" { tsrv.outputReloadSuccess.Add(1) @@ -779,7 +780,7 @@ func Test_SmokeTest_Agent_Calls(t *testing.T) { srv.waitExit() //nolint:errcheck // test case } -func EnrollAgent(enrollBody string, t *testing.T, ctx context.Context, srv *tserver) (string, string) { +func EnrollAgent(t *testing.T, ctx context.Context, srv *tserver, enrollBody string) (string, string) { req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/enroll", strings.NewReader(enrollBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+srv.enrollKey) @@ -833,10 +834,10 @@ func Test_Agent_Enrollment_Id(t *testing.T) { require.NoError(t, err) t.Log("Enroll the first agent with enrollment_id") - firstAgentID, _ := EnrollAgent(enrollBodyWEnrollmentID, t, ctx, srv) + firstAgentID, _ := EnrollAgent(t, ctx, srv, enrollBodyWEnrollmentID) t.Log("Enroll the second agent with the same enrollment_id") - secondAgentID, _ := EnrollAgent(enrollBodyWEnrollmentID, t, ctx, srv) + secondAgentID, _ := EnrollAgent(t, ctx, srv, enrollBodyWEnrollmentID) // cleanup defer func() { @@ -882,7 +883,7 @@ func Test_Agent_Enrollment_Id_Invalidated_API_key(t *testing.T) { require.NoError(t, err) t.Log("Enroll the first agent with enrollment_id") - firstAgentID, _ := EnrollAgent(enrollBodyWEnrollmentID, t, ctx, srv) + firstAgentID, _ := EnrollAgent(t, ctx, srv, enrollBodyWEnrollmentID) agent, err := dl.FindAgent(ctx, srv.bulker, dl.QueryAgentByID, dl.FieldID, firstAgentID) if err != nil { @@ -897,7 +898,7 @@ func Test_Agent_Enrollment_Id_Invalidated_API_key(t *testing.T) { } t.Log("Enroll the second agent with the same enrollment_id") - secondAgentID, _ := EnrollAgent(enrollBodyWEnrollmentID, t, ctx, srv) + secondAgentID, _ := EnrollAgent(t, ctx, srv, enrollBodyWEnrollmentID) // cleanup defer func() { @@ -1370,3 +1371,148 @@ func Test_SmokeTest_CheckinPollShutdown(t *testing.T) { cancel() srv.waitExit() //nolint:errcheck // test case } + +// Test_SmokeTest_Verify_v85Migrate will ensure that the policy regenerates o +func Test_SmokeTest_Verify_v85Migrate(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start test server + srv, err := startTestServer(t, ctx, policyData) + require.NoError(t, err) + + cli := cleanhttp.DefaultClient() + + // enroll an agent + enrollBody := `{ + "type": "PERMANENT", + "shared_id": "", + "metadata": { + "user_provided": {}, + "local": {}, + "tags": [] + } + }` + t.Log("Enroll an agent") + id, key := EnrollAgent(t, ctx, srv, enrollBody) + + // checkin + t.Logf("Fake a checkin for agent %s", id) + req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+id+"/checkin", strings.NewReader(checkinBody)) + require.NoError(t, err) + req.Header.Set("Authorization", "ApiKey "+key) + req.Header.Set("User-Agent", "elastic agent "+serverVersion) + req.Header.Set("Content-Type", "application/json") + res, err := cli.Do(req) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, res.StatusCode) + t.Log("Checkin successful, verify body") + p, _ := io.ReadAll(res.Body) + res.Body.Close() + var obj map[string]interface{} + err = json.Unmarshal(p, &obj) + require.NoError(t, err) + + at, ok := obj["ack_token"] + require.True(t, ok, "expected ack_token in response") + _, ok = at.(string) + require.True(t, ok, "ack_token is not a string") + + actionsRaw, ok := obj["actions"] + require.True(t, ok, "expected actions is missing") + actions, ok := actionsRaw.([]interface{}) + require.True(t, ok, "expected actions to be an array") + require.Greater(t, len(actions), 0, "expected at least 1 action") + action, ok := actions[0].(map[string]interface{}) + require.True(t, ok, "expected action to be an object") + aIDRaw, ok := action["id"] + require.True(t, ok, "expected action id attribute missing") + aID, ok := aIDRaw.(string) + require.True(t, ok, "expected action id to be string") + aAgentIDRaw, ok := action["agent_id"] + require.True(t, ok, "expected action agent_id attribute missing") + aAgentID, ok := aAgentIDRaw.(string) + require.True(t, ok, "expected action agent_id to be string") + require.Equal(t, id, aAgentID) + + body := fmt.Sprintf(`{ + "events": [{ + "action_id": "%s", + "agent_id": "%s", + "message": "test-message", + "type": "ACTION_RESULT", + "subtype": "ACKNOWLEDGED" + }] + }`, aID, id) + t.Logf("Fake an ack for action %s for agent %s", aID, id) + req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+id+"/acks", strings.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Authorization", "ApiKey "+key) + req.Header.Set("Content-Type", "application/json") + res, err = cli.Do(req) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, res.StatusCode) + t.Log("Ack successful, verify body") + p, _ = io.ReadAll(res.Body) + res.Body.Close() + var ackObj map[string]interface{} + err = json.Unmarshal(p, &ackObj) + require.NoError(t, err) + + // NOTE the checkin response will only have the errors attribute if it's set to true in the response. + // When decoding to a (typed) struct, the default will implicitly be false if it's missing + _, ok = ackObj["errors"] + require.Falsef(t, ok, "expected response to have no errors attribute, errors are present: %+v", ackObj) + + // Update agent doc to have output key == "" + agent, err := dl.FindAgent(ctx, srv.bulker, dl.QueryAgentByID, dl.FieldID, id) + require.NoError(t, err) + outputNames := make([]string, 0, len(agent.Outputs)) + for name := range agent.Outputs { + outputNames = append(outputNames, name) + } + require.Len(t, outputNames, 1) + p = []byte(fmt.Sprintf(`{"script":{"lang": "painless", "source": "ctx._source['outputs'][params.output].api_key == ''; ctx._source['outputs'][params.output].api_key_id == '';", "params": {"output": "%s"}}}`, outputNames[0])) + t.Logf("Attempting to remove api_key attribute from: %s, body: %s", id, string(p)) + err = srv.bulker.Update( + ctx, + dl.FleetAgents, + id, + p, + bulk.WithRefresh(), + bulk.WithRetryOnConflict(3), + ) + require.NoError(t, err) + + // Checkin again to get policy change action and new keys + req, err = http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+id+"/checkin", strings.NewReader(checkinBody)) + require.NoError(t, err) + req.Header.Set("Authorization", "ApiKey "+key) + req.Header.Set("User-Agent", "elastic agent "+serverVersion) + req.Header.Set("Content-Type", "application/json") + res, err = cli.Do(req) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, res.StatusCode) + t.Log("Checkin successful, verify body") + p, _ = io.ReadAll(res.Body) + res.Body.Close() + err = json.Unmarshal(p, &obj) + require.NoError(t, err) + + at, ok = obj["ack_token"] + require.True(t, ok, "expected ack_token in response") + _, ok = at.(string) + require.True(t, ok, "ack_token is not a string") + + actionsRaw, ok = obj["actions"] + require.True(t, ok, "expected actions is missing") + actions, ok = actionsRaw.([]interface{}) + require.True(t, ok, "expected actions to be an array") + require.Greater(t, len(actions), 0, "expected at least 1 action") + + cancel() + srv.waitExit() //nolint:errcheck // test case +} diff --git a/internal/pkg/server/remote_es_output_integration_test.go b/internal/pkg/server/remote_es_output_integration_test.go index 330686f20..1b64bb075 100644 --- a/internal/pkg/server/remote_es_output_integration_test.go +++ b/internal/pkg/server/remote_es_output_integration_test.go @@ -202,7 +202,7 @@ func Test_Agent_Remote_ES_Output(t *testing.T) { t.Log("Enroll agent") srvCopy := srv srvCopy.enrollKey = newKey.Token() - agentID, key := EnrollAgent(enrollBody, t, ctx, srvCopy) + agentID, key := EnrollAgent(t, ctx, srvCopy, enrollBody) // cleanup defer func() { @@ -350,7 +350,7 @@ func Test_Agent_Remote_ES_Output_ForceUnenroll(t *testing.T) { t.Log("Enroll agent") srvCopy := srv srvCopy.enrollKey = newKey.Token() - agentID, key := EnrollAgent(enrollBody, t, ctx, srvCopy) + agentID, key := EnrollAgent(t, ctx, srvCopy, enrollBody) // cleanup defer func() { @@ -471,7 +471,7 @@ func Test_Agent_Remote_ES_Output_Unenroll(t *testing.T) { t.Log("Enroll agent") srvCopy := srv srvCopy.enrollKey = newKey.Token() - agentID, key := EnrollAgent(enrollBody, t, ctx, srvCopy) + agentID, key := EnrollAgent(t, ctx, srvCopy, enrollBody) // cleanup defer func() { From 70a26f75d652de3004ad8fb36f3881366e56ff67 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 17 Apr 2024 10:30:38 -0700 Subject: [PATCH 6/8] fix integration test --- internal/pkg/server/fleet_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index 42158a26b..f5f915fae 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -1474,7 +1474,7 @@ func Test_SmokeTest_Verify_v85Migrate(t *testing.T) { outputNames = append(outputNames, name) } require.Len(t, outputNames, 1) - p = []byte(fmt.Sprintf(`{"script":{"lang": "painless", "source": "ctx._source['outputs'][params.output].api_key == ''; ctx._source['outputs'][params.output].api_key_id == '';", "params": {"output": "%s"}}}`, outputNames[0])) + p = []byte(fmt.Sprintf(`{"script":{"lang": "painless", "source": "ctx._source['outputs'][params.output].api_key = ''; ctx._source['outputs'][params.output].api_key_id = '';", "params": {"output": "%s"}}}`, outputNames[0])) t.Logf("Attempting to remove api_key attribute from: %s, body: %s", id, string(p)) err = srv.bulker.Update( ctx, From 79aec50fcf055775f2e68a7593a00d25a77dccb6 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 17 Apr 2024 10:51:45 -0700 Subject: [PATCH 7/8] Fix linter, cleanup codesmells, add deprecations to model/schema.json --- internal/pkg/dl/migration_integration_test.go | 42 ------------------- internal/pkg/policy/revision.go | 2 +- internal/pkg/server/fleet_integration_test.go | 5 +-- model/schema.json | 2 + 4 files changed, 5 insertions(+), 46 deletions(-) diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go index 78a470f8e..142b02cf2 100644 --- a/internal/pkg/dl/migration_integration_test.go +++ b/internal/pkg/dl/migration_integration_test.go @@ -72,48 +72,6 @@ func createSomeAgents(ctx context.Context, t *testing.T, n int, apiKey bulk.APIK return createdAgents } -func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, bulker bulk.Bulk) []string { - t.Helper() - - var created []string - - var policyData = model.PolicyData{ - Outputs: map[string]map[string]interface{}{ - "default": { - "type": "elasticsearch", - }, - }, - OutputPermissions: json.RawMessage(`{"default": {}}`), - Inputs: []map[string]interface{}{}, - } - - for i := 0; i < n; i++ { - now := time.Now().UTC() - nowStr := now.Format(time.RFC3339) - - policyModel := model.Policy{ - ESDocument: model.ESDocument{}, - Data: &policyData, - DefaultFleetServer: false, - PolicyID: fmt.Sprint(i), - RevisionIdx: 1, - Timestamp: nowStr, - UnenrollTimeout: 0, - } - - body, err := json.Marshal(policyModel) - require.NoError(t, err) - - policyDocID, err := bulker.Create( - ctx, index, "", body, bulk.WithRefresh()) - require.NoError(t, err) - - created = append(created, policyDocID) - } - - return created -} - func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() diff --git a/internal/pkg/policy/revision.go b/internal/pkg/policy/revision.go index dc3d29eb0..0b84a3576 100644 --- a/internal/pkg/policy/revision.go +++ b/internal/pkg/policy/revision.go @@ -29,7 +29,7 @@ func RevisionFromPolicy(policy model.Policy) Revision { // RevisionFromString converts the string to a policy revision. func RevisionFromString(actionID string) (Revision, bool) { split := strings.Split(actionID, ":") - // NOTE: len 3 is expected for any policy change generated by fleet server v8.13+ + // NOTE: len 3 is expected for any policy change generated by fleet server v8.15+ // len 4 includes the previously used coordinator_idx value that has been deprecated. // If we receive a actionID with the coordinator_idx ignore the coordinator_idx value. if len(split) < 3 || len(split) > 4 { diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index f5f915fae..d2ffa8ee3 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -72,7 +72,6 @@ type tserver struct { g *errgroup.Group srv *Fleet enrollKey string - policyID string bulker bulk.Bulk outputReloadSuccess atomic.Int32 @@ -220,7 +219,7 @@ func startTestServer(t *testing.T, ctx context.Context, policyD model.PolicyData // Since we start the server in agent mode we need a way to detect if the policy monitor has reloaded the output // NOTE: This code is brittle as it depends on a log string message match - tsrv := &tserver{cfg: cfg, g: g, srv: srv, enrollKey: key.Token(), policyID: policyID, bulker: bulker} + tsrv := &tserver{cfg: cfg, g: g, srv: srv, enrollKey: key.Token(), bulker: bulker} ctx = testlog.SetLogger(t).Hook(zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) { if level == zerolog.InfoLevel && message == "Using output from policy" { tsrv.outputReloadSuccess.Add(1) @@ -1372,7 +1371,7 @@ func Test_SmokeTest_CheckinPollShutdown(t *testing.T) { srv.waitExit() //nolint:errcheck // test case } -// Test_SmokeTest_Verify_v85Migrate will ensure that the policy regenerates o +// Test_SmokeTest_Verify_v85Migrate will ensure that the policy regenerates output keys when the agent doc contains an empty key func Test_SmokeTest_Verify_v85Migrate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/model/schema.json b/model/schema.json index 94f748633..9b2769d0a 100644 --- a/model/schema.json +++ b/model/schema.json @@ -292,6 +292,7 @@ }, "server": { + "deprecated": true, "title": "Server", "description": "A Fleet Server", "type": "object", @@ -357,6 +358,7 @@ }, "policy-leader": { + "deprecated": true, "title": "Policy Leader", "description": "The current leader Fleet Server for a policy", "type": "object", From fdbcc676db0d4888c13f85d0e2eb3d384099ec82 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Tue, 18 Jun 2024 16:24:07 -0700 Subject: [PATCH 8/8] Fix integration test --- internal/pkg/server/namespaces_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/server/namespaces_integration_test.go b/internal/pkg/server/namespaces_integration_test.go index 29a617066..4ad77c8ce 100644 --- a/internal/pkg/server/namespaces_integration_test.go +++ b/internal/pkg/server/namespaces_integration_test.go @@ -221,7 +221,7 @@ func Test_Agent_Namespace_test1(t *testing.T) { t.Log("Enroll agent") srvCopy := srv srvCopy.enrollKey = newKey.Token() - agentID, key := EnrollAgent(enrollBody, t, ctx, srvCopy) + agentID, key := EnrollAgent(t, ctx, srvCopy, enrollBody) AssertAgentDocContainNamespace(t, ctx, srv, agentID, testNamespace) // cleanup