From f691382bb96751201ccff20ac805e2ef2437c7c7 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Wed, 29 Nov 2023 11:36:41 -0800 Subject: [PATCH] Remove the coordinator Remove the policy coordinator and policy leader election mechanisms from fleet-server. Deprecate the coordinator_idx value in fleet-server's json schema and remove coordinator_idx references when processing policies. --- internal/pkg/api/handleAck.go | 24 +- internal/pkg/api/handleAck_test.go | 7 +- internal/pkg/api/handleCheckin.go | 3 +- internal/pkg/coordinator/coordinator.go | 33 -- internal/pkg/coordinator/monitor.go | 440 ------------------ .../coordinator/monitor_integration_test.go | 215 --------- internal/pkg/coordinator/v0.go | 87 ---- internal/pkg/coordinator/v0_test.go | 92 ---- internal/pkg/dl/constants.go | 4 - internal/pkg/dl/migration.go | 13 +- internal/pkg/dl/migration_integration_test.go | 7 +- internal/pkg/dl/policies.go | 1 - internal/pkg/dl/policies_integration_test.go | 1 - internal/pkg/dl/policies_leader.go | 154 ------ .../dl/policies_leader_integration_test.go | 176 ------- internal/pkg/dl/servers.go | 59 --- internal/pkg/dl/servers_integration_test.go | 58 --- internal/pkg/model/schema.go | 2 +- internal/pkg/policy/monitor.go | 24 +- .../pkg/policy/monitor_integration_test.go | 11 +- internal/pkg/policy/monitor_test.go | 107 +---- internal/pkg/policy/revision.go | 26 +- internal/pkg/policy/self_test.go | 6 - internal/pkg/policy/sub.go | 8 +- internal/pkg/policy/sub_test.go | 12 +- internal/pkg/server/fleet.go | 6 +- internal/pkg/server/fleet_integration_test.go | 2 +- model/schema.json | 3 +- 28 files changed, 63 insertions(+), 1518 deletions(-) delete mode 100644 internal/pkg/coordinator/coordinator.go delete mode 100644 internal/pkg/coordinator/monitor.go delete mode 100644 internal/pkg/coordinator/monitor_integration_test.go delete mode 100644 internal/pkg/coordinator/v0.go delete mode 100644 internal/pkg/coordinator/v0_test.go delete mode 100644 internal/pkg/dl/policies_leader.go delete mode 100644 internal/pkg/dl/policies_leader_integration_test.go delete mode 100644 internal/pkg/dl/servers.go delete mode 100644 internal/pkg/dl/servers_integration_test.go diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 489aefba5b..9e03943a92 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -384,11 +384,10 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag defer span.End() // If more than one, pick the winner; // 0) Correct policy id - // 1) Highest revision/coordinator number + // 1) Highest revision number found := false currRev := agent.PolicyRevisionIdx - currCoord := agent.PolicyCoordinatorIdx vSpan, _ := apm.StartSpan(ctx, "checkPolicyActions", "validate") for _, a := range actionIds { rev, ok := policy.RevisionFromString(a) @@ -396,18 +395,13 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag zlog.Debug(). Str("agent.policyId", agent.PolicyID). Int64("agent.revisionIdx", currRev). - Int64("agent.coordinatorIdx", currCoord). Str("rev.policyId", rev.PolicyID). Int64("rev.revisionIdx", rev.RevisionIdx). - Int64("rev.coordinatorIdx", rev.CoordinatorIdx). Msg("ack policy revision") - if ok && rev.PolicyID == agent.PolicyID && - (rev.RevisionIdx > currRev || - (rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { + if ok && rev.PolicyID == agent.PolicyID && rev.RevisionIdx > currRev { found = true currRev = rev.RevisionIdx - currCoord = rev.CoordinatorIdx } } @@ -432,7 +426,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag err := ack.updateAgentDoc(ctx, zlog, agent.Id, - currRev, currCoord, + currRev, agent.PolicyID) if err != nil { return err @@ -506,7 +500,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context, func (ack *AckT) updateAgentDoc(ctx context.Context, zlog zerolog.Logger, agentID string, - currRev, currCoord int64, + currRev int64, policyID string, ) error { span, ctx := apm.StartSpan(ctx, "updateAgentDoc", "update") @@ -514,7 +508,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context, body := makeUpdatePolicyBody( policyID, currRev, - currCoord, ) err := ack.bulk.Update( @@ -529,7 +522,6 @@ func (ack *AckT) updateAgentDoc(ctx context.Context, zlog.Err(err). Str(LogPolicyID, policyID). Int64("policyRevision", currRev). - Int64("policyCoordinator", currCoord). Msg("ack policy") if err != nil { @@ -708,7 +700,7 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age // has not changed underneath us by an upstream process (Kibana or otherwise). // We have a race condition where a user could have assigned a new policy to // an agent while we were busy updating the old one. A blind update to the -// agent record without a check could set the revision and coordIdx for the wrong +// agent record without a check could set the revision for the wrong // policy. This script should be coupled with a "retry_on_conflict" parameter // to allow for *other* changes to the agent record while we running the script. // (For example, say the background bulk check-in timestamp update task fires) @@ -718,12 +710,10 @@ func isAgentActive(ctx context.Context, zlog zerolog.Logger, bulk bulk.Bulk, age const kUpdatePolicyPrefix = `{"script":{"lang":"painless","source":"if (ctx._source.policy_id == params.id) {ctx._source.remove('default_api_key_history');ctx._source.` + dl.FieldPolicyRevisionIdx + ` = params.rev;ctx._source.` + - dl.FieldPolicyCoordinatorIdx + - `= params.coord;ctx._source.` + dl.FieldUpdatedAt + ` = params.ts;} else {ctx.op = \"noop\";}","params": {"id":"` -func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte { +func makeUpdatePolicyBody(policyID string, newRev int64) []byte { var buf bytes.Buffer buf.Grow(410) @@ -732,8 +722,6 @@ func makeUpdatePolicyBody(policyID string, newRev, coordIdx int64) []byte { buf.WriteString(policyID) buf.WriteString(`","rev":`) buf.WriteString(strconv.FormatInt(newRev, 10)) - buf.WriteString(`,"coord":`) - buf.WriteString(strconv.FormatInt(coordIdx, 10)) buf.WriteString(`,"ts":"`) buf.WriteString(time.Now().UTC().Format(time.RFC3339)) buf.WriteString(`"}}}`) diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 45839a9165..5fdccfdb6d 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -32,20 +32,17 @@ func BenchmarkMakeUpdatePolicyBody(b *testing.B) { const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207" const newRev = 2 - const coord = 1 for n := 0; n < b.N; n++ { - makeUpdatePolicyBody(policyID, newRev, coord) + makeUpdatePolicyBody(policyID, newRev) } } func TestMakeUpdatePolicyBody(t *testing.T) { - const policyID = "ed110be4-c2a0-42b8-adc0-94c2f0569207" const newRev = 2 - const coord = 1 - data := makeUpdatePolicyBody(policyID, newRev, coord) + data := makeUpdatePolicyBody(policyID, newRev) var i interface{} err := json.Unmarshal(data, &i) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 8512a637c1..0b4c330633 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -246,7 +246,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r actCh := aSub.Ch() // Subscribe to policy manager for changes on PolicyId > policyRev - sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx, agent.PolicyCoordinatorIdx) + sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx) if err != nil { return fmt.Errorf("subscribe policy monitor: %w", err) } @@ -730,7 +730,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a zlog = zlog.With(). Str("fleet.ctx", "processPolicy"). Int64("fleet.policyRevision", pp.Policy.RevisionIdx). - Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx). Str(LogPolicyID, pp.Policy.PolicyID). Logger() diff --git a/internal/pkg/coordinator/coordinator.go b/internal/pkg/coordinator/coordinator.go deleted file mode 100644 index c251147d31..0000000000 --- a/internal/pkg/coordinator/coordinator.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -// Package coordinator handles the deployment of Elastic Agent policies between Fleet Server node. -package coordinator - -import ( - "context" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" -) - -// Factory creates a new coordinator for a policy. -type Factory func(policy model.Policy) (Coordinator, error) - -// Coordinator processes a policy and produces a new policy. -type Coordinator interface { - // Name is the name of the coordinator - Name() string - - // Run runs the coordinator for the policy. - Run(ctx context.Context) error - - // Update called to signal a new policy revision has been defined. - // - // This should not block as its called by the main loop in the coordinator manager. The implemented coordinator - // should push it over a channel for the work to be done in another go routine. - Update(ctx context.Context, policy model.Policy) error - - // Output is the output channel for updated coordinated policies. - Output() <-chan model.Policy -} diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go deleted file mode 100644 index 692292fa28..0000000000 --- a/internal/pkg/coordinator/monitor.go +++ /dev/null @@ -1,440 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package coordinator - -import ( - "context" - "errors" - "fmt" - "net" - "os" - "runtime" - "sync" - "time" - - "github.com/rs/zerolog" - "go.elastic.co/apm/v2" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/elastic/fleet-server/v7/internal/pkg/monitor" - "github.com/elastic/fleet-server/v7/internal/pkg/sleep" -) - -const ( - defaultCheckInterval = 20 * time.Second // check for valid leaders every 20 seconds - defaultLeaderInterval = 30 * time.Second // become leader for at least 30 seconds - defaultMetadataInterval = 5 * time.Minute // update metadata every 5 minutes - defaultCoordinatorRestartDelay = 5 * time.Second // delay in restarting coordinator on failure -) - -// Monitor monitors the leader election of policies and routes managed policies to the coordinator. -type Monitor interface { - // Run runs the monitor. - Run(context.Context) error -} - -type policyT struct { - id string - cord Coordinator - cordCanceller context.CancelFunc -} - -type monitorT struct { - bulker bulk.Bulk - monitor monitor.Monitor - factory Factory - - fleet config.Fleet - version string - agentMetadata model.AgentMetadata - hostMetadata model.HostMetadata - - checkInterval time.Duration - leaderInterval time.Duration - metadataInterval time.Duration - coordRestartDelay time.Duration - - serversIndex string - policiesIndex string - leadersIndex string - agentsIndex string - - policies map[string]policyT - - muPoliciesCanceller sync.Mutex - policiesCanceller map[string]context.CancelFunc -} - -// NewMonitor creates a new coordinator policy monitor. -func NewMonitor(fleet config.Fleet, version string, bulker bulk.Bulk, monitor monitor.Monitor, factory Factory) Monitor { - return &monitorT{ - version: version, - fleet: fleet, - bulker: bulker, - monitor: monitor, - factory: factory, - checkInterval: defaultCheckInterval, - leaderInterval: defaultLeaderInterval, - metadataInterval: defaultMetadataInterval, - coordRestartDelay: defaultCoordinatorRestartDelay, - serversIndex: dl.FleetServers, - policiesIndex: dl.FleetPolicies, - leadersIndex: dl.FleetPoliciesLeader, - agentsIndex: dl.FleetAgents, - policies: make(map[string]policyT), - policiesCanceller: make(map[string]context.CancelFunc), - } -} - -// Run runs the monitor. -func (m *monitorT) Run(ctx context.Context) (err error) { - log := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Logger() - // When ID of the Agent is not provided to Fleet Server then the Agent - // has not enrolled. The Fleet Server cannot become a leader until the - // Agent it is running under has been enrolled. - m.calcMetadata(ctx) - if m.agentMetadata.ID == "" { - log.Warn().Msg("missing config fleet.agent.id; acceptable until Elastic Agent has enrolled") - <-ctx.Done() - return ctx.Err() - } - - // Start timer loop to ensure leadership - lT := time.NewTimer(m.checkInterval) - defer lT.Stop() - - // Ensure leadership on startup - for { - err = m.ensureLeadership(ctx) - if err != nil { - log.Warn().Err(err).Msg("error ensuring leadership, will retry") - select { - case <-lT.C: - lT.Reset(m.checkInterval) - continue - case <-ctx.Done(): - m.releaseLeadership() - return ctx.Err() - } - } - break - } - - // Subscribe to the monitor for policies - s := m.monitor.Subscribe() - defer m.monitor.Unsubscribe(s) - - // Start timer to update metadata (mainly for updated IP addresses of the host) - mT := time.NewTimer(m.metadataInterval) - defer mT.Stop() - - // Keep track of errored statuses - erroredOnLastRequest := false - numFailedRequests := 0 - for { - select { - case hits := <-s.Output(): - err = m.handlePolicies(ctx, hits) - if err != nil { - erroredOnLastRequest = true - numFailedRequests++ - log.Warn().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") - } - case <-mT.C: - m.calcMetadata(ctx) - mT.Reset(m.metadataInterval) - case <-lT.C: - err = m.ensureLeadership(ctx) - if err != nil { - erroredOnLastRequest = true - numFailedRequests++ - log.Warn().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") - } - lT.Reset(m.checkInterval) - case <-ctx.Done(): - m.releaseLeadership() - return ctx.Err() - } - if err == nil && erroredOnLastRequest { - erroredOnLastRequest = false - log.Info().Msgf("Policy leader monitor successfully recovered after %d attempts", numFailedRequests) - numFailedRequests = 0 - } - } -} - -// handlePolicies handles new policies or policy changes. -func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { - log := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Logger() - new := false - for _, hit := range hits { - var policy model.Policy - err := hit.Unmarshal(&policy) - if err != nil { - log.Debug().Err(err).Msg("Failed to deserialize policy json") - return err - } - if policy.CoordinatorIdx != 0 { - // policy revision was inserted by coordinator so this monitor ignores it - continue - } - p, ok := m.policies[policy.PolicyID] - if ok { - // not a new policy - if p.cord != nil { - // current leader send to its coordinator - err = p.cord.Update(ctx, policy) - if err != nil { - log.Info().Err(err).Msg("Failed to update policy leader") - return err - } - } - } else { - new = true - } - } - if new { - // new policy discovered; leadership needs to be performed - err := m.ensureLeadership(ctx) - if err != nil { - return err - } - } - return nil -} - -// ensureLeadership ensures leadership is held or needs to be taken over. -func (m *monitorT) ensureLeadership(ctx context.Context) error { - if m.bulker.HasTracer() { - trans := m.bulker.StartTransaction("Ensure leadership", "bulker") - ctx = apm.ContextWithTransaction(ctx, trans) - defer trans.End() - } - zerolog.Ctx(ctx).Debug().Str("ctx", "policy leader manager").Msg("ensuring leadership of policies") - err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) - - if err != nil { - return fmt.Errorf("failed to check server status on Elasticsearch (%s): %w", m.hostMetadata.Name, err) - } - - // fetch current policies and leaders - leaders := map[string]model.PolicyLeader{} - policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) - if err != nil { - if errors.Is(err, es.ErrIndexNotFound) { - zerolog.Ctx(ctx).Debug().Str("ctx", "policy leader manager").Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) - return nil - } - return fmt.Errorf("encountered error while querying policies: %w", err) - } - if len(policies) > 0 { - ids := make([]string, len(policies)) - for i, p := range policies { - ids[i] = p.PolicyID - } - leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) - if err != nil { - if !errors.Is(err, es.ErrIndexNotFound) { - return fmt.Errorf("encountered error while fetching policy leaders: %w", err) - } - } - } - - // determine the policies that lead needs to be taken - var lead []model.Policy - now := time.Now().UTC() - for _, policy := range policies { - leader, ok := leaders[policy.PolicyID] - if !ok { - // new policy want to try to take leadership - lead = append(lead, policy) - continue - } - t, err := leader.Time() - if err != nil { - return err - } - if now.Sub(t) > m.leaderInterval || leader.Server.ID == m.agentMetadata.ID { - // policy needs a new leader or already leader - lead = append(lead, policy) - } - } - - // take/keep leadership and start new coordinators - res := make(chan policyT) - for _, p := range lead { - pt := m.policies[p.PolicyID] - pt.id = p.PolicyID - go func(p model.Policy, pt policyT) { - defer func() { - res <- pt - }() - - l := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Str(dl.FieldPolicyID, pt.id).Logger() - err := dl.TakePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.version, dl.WithIndexName(m.leadersIndex)) - if err != nil { - l.Warn().Err(err).Msg("monitor.ensureLeadership: failed to take ownership") - if pt.cord != nil { - pt.cord = nil - } - if pt.cordCanceller != nil { - pt.cordCanceller() - pt.cordCanceller = nil - } - return - } - if pt.cord == nil { - cord, err := m.factory(p) - if err != nil { - l.Err(err).Msg("failed to start coordinator") - err = dl.ReleasePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.leaderInterval, dl.WithIndexName(m.leadersIndex)) - if err != nil { - l.Err(err).Msg("failed to release policy leadership") - } - return - } - - cordCtx, canceller := context.WithCancel(ctx) - go runCoordinator(cordCtx, cord, l, m.coordRestartDelay) - go runCoordinatorOutput(cordCtx, cord, m.bulker, l, m.policiesIndex) - pt.cord = cord - pt.cordCanceller = canceller - } else { - err = pt.cord.Update(ctx, p) - if err != nil { - l.Err(err).Msg("failed to update coordinator") - } - } - }(p, pt) - } - for range lead { - r := <-res - if r.cord == nil { - // either failed to take leadership or lost leadership - delete(m.policies, r.id) - - m.muPoliciesCanceller.Lock() - delete(m.policiesCanceller, r.id) - m.muPoliciesCanceller.Unlock() - } else { - m.policies[r.id] = r - } - } - return nil -} - -// releaseLeadership releases current leadership -func (m *monitorT) releaseLeadership() { - var wg sync.WaitGroup - wg.Add(len(m.policies)) - for _, pt := range m.policies { - go func(pt policyT) { - if pt.cord != nil { - pt.cordCanceller() - } - // uses a background context, because the context for the - // monitor will be cancelled at this point in the code - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := dl.ReleasePolicyLeadership(ctx, m.bulker, pt.id, m.agentMetadata.ID, m.leaderInterval, dl.WithIndexName(m.leadersIndex)) - if err != nil { - l := zerolog.Ctx(ctx).With().Str("ctx", "policy leader manager").Str(dl.FieldPolicyID, pt.id).Logger() - l.Warn().Err(err).Msg("monitor.releaseLeadership: failed to release leadership") - } - wg.Done() - }(pt) - } - wg.Wait() -} - -func (m *monitorT) calcMetadata(ctx context.Context) { - m.agentMetadata = model.AgentMetadata{ - ID: m.fleet.Agent.ID, - Version: m.fleet.Agent.Version, - } - hostname := m.fleet.Host.Name - if hostname == "" { - h, err := os.Hostname() - if err != nil { - zerolog.Ctx(ctx).Error().Str("ctx", "policy leader manager").Err(err).Msg("failed to get hostname") - } - hostname = h - } - ips, err := m.getIPs() - if err != nil { - zerolog.Ctx(ctx).Error().Str("ctx", "policy leader manager").Err(err).Msg("failed to get ip addresses") - } - m.hostMetadata = model.HostMetadata{ - ID: m.fleet.Host.ID, - Name: hostname, - Architecture: runtime.GOOS, - Ip: ips, - } -} - -func (m *monitorT) getIPs() ([]string, error) { - ifaces, err := net.Interfaces() - if err != nil { - return nil, err - } - ips := []string{} - for _, i := range ifaces { - addrs, err := i.Addrs() - if err != nil { - continue - } - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - if ip != nil { - ips = append(ips, ip.String()) - } - } - } - return ips, nil -} - -func runCoordinator(ctx context.Context, cord Coordinator, l zerolog.Logger, d time.Duration) { - cnt := 0 - for { - l.Info().Int("count", cnt).Str("coordinator", cord.Name()).Msg("Starting policy coordinator") - err := cord.Run(ctx) - if !errors.Is(err, context.Canceled) { - l.Err(err).Msg("Policy coordinator failed and stopped") - if errors.Is(sleep.WithContext(ctx, d), context.Canceled) { - break - } - } else { - break - } - cnt += 1 - } -} - -func runCoordinatorOutput(ctx context.Context, cord Coordinator, bulker bulk.Bulk, l zerolog.Logger, policiesIndex string) { - for { - select { - case p := <-cord.Output(): - s := l.With().Int64(dl.FieldRevisionIdx, p.RevisionIdx).Int64(dl.FieldCoordinatorIdx, p.CoordinatorIdx).Logger() - _, err := dl.CreatePolicy(ctx, bulker, p, dl.WithIndexName(policiesIndex)) - if err != nil { - s.Err(err).Msg("Policy coordinator failed to add a new policy revision") - } else { - s.Info().Int64("revision_id", p.RevisionIdx).Msg("Policy coordinator added a new policy revision") - } - case <-ctx.Done(): - return - } - } -} diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go deleted file mode 100644 index 8fa29b8eea..0000000000 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package coordinator - -import ( - "context" - "encoding/json" - "errors" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/dl" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/elastic/fleet-server/v7/internal/pkg/monitor" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" - testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" -) - -func TestMonitorLeadership(t *testing.T) { - parentCtx := context.Background() - parentCtx = testlog.SetLogger(t).WithContext(parentCtx) - bulkCtx, bulkCn := context.WithCancel(parentCtx) - defer bulkCn() - ctx, cn := context.WithCancel(parentCtx) - defer cn() - - // flush bulker on every operation - bulker := ftesting.SetupBulk(bulkCtx, t, bulk.WithFlushThresholdCount(1)) - - serversIndex := ftesting.CleanIndex(ctx, t, bulker, dl.FleetServers) - policiesIndex := ftesting.CleanIndex(ctx, t, bulker, dl.FleetPolicies) - leadersIndex := ftesting.CleanIndex(ctx, t, bulker, dl.FleetPoliciesLeader) - - pim, err := monitor.New(policiesIndex, bulker.Client(), bulker.Client()) - if err != nil { - t.Fatal(err) - } - cfg := makeFleetConfig() - pm := NewMonitor(cfg, "1.0.0", bulker, pim, NewCoordinatorZero) - pm.(*monitorT).serversIndex = serversIndex - pm.(*monitorT).leadersIndex = leadersIndex - pm.(*monitorT).policiesIndex = policiesIndex - - // start with 1 initial policy - policy1Id := uuid.Must(uuid.NewV4()).String() - policy1 := model.Policy{ - PolicyID: policy1Id, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 1, - } - _, err = dl.CreatePolicy(ctx, bulker, policy1, dl.WithIndexName(policiesIndex)) - if err != nil { - t.Fatal(err) - } - - // start the monitors - g, _ := errgroup.WithContext(context.Background()) - g.Go(func() error { - err := pim.Run(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - return err - } - return nil - }) - g.Go(func() error { - err := pm.Run(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - return err - } - return nil - }) - - // wait 500ms to ensure everything is running; then create a new policy - <-time.After(500 * time.Millisecond) - policy2Id := uuid.Must(uuid.NewV4()).String() - policy2 := model.Policy{ - PolicyID: policy2Id, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 1, - } - _, err = dl.CreatePolicy(ctx, bulker, policy2, dl.WithIndexName(policiesIndex)) - if err != nil { - t.Fatal(err) - } - - // wait 2 seconds so the index monitor notices the new policy - <-time.After(2 * time.Second) - ensureServer(ctx, t, bulker, cfg, serversIndex) - ensureLeadership(ctx, t, bulker, cfg, leadersIndex, policy1Id) - ensureLeadership(ctx, t, bulker, cfg, leadersIndex, policy2Id) - ensurePolicy(ctx, t, bulker, policiesIndex, policy1Id, 1, 1) - ensurePolicy(ctx, t, bulker, policiesIndex, policy2Id, 1, 1) - - // stop the monitors - cn() - err = g.Wait() - require.NoError(t, err) - - // ensure leadership was released - ensureLeadershipReleased(bulkCtx, t, bulker, cfg, leadersIndex, policy1Id) - ensureLeadershipReleased(bulkCtx, t, bulker, cfg, leadersIndex, policy2Id) -} - -func makeFleetConfig() config.Fleet { - id := uuid.Must(uuid.NewV4()).String() - return config.Fleet{ - Agent: config.Agent{ - ID: id, - Version: "1.0.0", - }, - Host: config.Host{ - ID: id, - }, - } -} - -func ensureServer(ctx context.Context, t *testing.T, bulker bulk.Bulk, cfg config.Fleet, index string) { - t.Helper() - var srv model.Server - data, err := bulker.Read(ctx, index, cfg.Agent.ID, bulk.WithRefresh()) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &srv) - if err != nil { - t.Fatal(err) - } - if srv.Agent.ID != cfg.Agent.ID { - t.Fatal("agent.id should match from configuration") - } -} - -func ensureLeadership(ctx context.Context, t *testing.T, bulker bulk.Bulk, cfg config.Fleet, index string, policyID string) { - t.Helper() - var leader model.PolicyLeader - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != cfg.Agent.ID { - t.Fatal("server.id should match from configuration") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - if time.Now().UTC().Sub(lt) >= 5*time.Second { - t.Fatal("@timestamp should be with in 5 seconds") - } -} - -func ensurePolicy(ctx context.Context, t *testing.T, bulker bulk.Bulk, index string, policyID string, revisionIdx, coordinatorIdx int64) { - t.Helper() - policies, err := dl.QueryLatestPolicies(ctx, bulker, dl.WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - var found *model.Policy - for i := range policies { - p := policies[i] - if p.PolicyID == policyID { - found = &p - break - } - } - if found == nil { - t.Fatal("policy not found") - } - if found.RevisionIdx != revisionIdx { - t.Fatal("revision_idx does not match") - } - if found.CoordinatorIdx != coordinatorIdx { - t.Fatal("coordinator_idx does not match") - } -} - -func ensureLeadershipReleased(ctx context.Context, t *testing.T, bulker bulk.Bulk, cfg config.Fleet, index string, policyID string) { - t.Helper() - var leader model.PolicyLeader - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != cfg.Agent.ID { - t.Fatal("server.id should match from configuration") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - diff := time.Now().UTC().Sub(lt).Seconds() - if diff < (30 * time.Second).Seconds() { - t.Fatalf("@timestamp different should be more than 30 seconds; instead its %.0f secs", diff) - } -} diff --git a/internal/pkg/coordinator/v0.go b/internal/pkg/coordinator/v0.go deleted file mode 100644 index dcf3431681..0000000000 --- a/internal/pkg/coordinator/v0.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package coordinator - -import ( - "context" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - - "github.com/rs/zerolog" -) - -// coordinatorZeroT is V0 coordinator that just takes a subscribed policy and outputs the same policy. -type coordinatorZeroT struct { - policy model.Policy - in chan model.Policy - out chan model.Policy -} - -// NewCoordinatorZero creates a V0 coordinator. -func NewCoordinatorZero(policy model.Policy) (Coordinator, error) { - return &coordinatorZeroT{ - policy: policy, - in: make(chan model.Policy), - out: make(chan model.Policy), - }, nil -} - -// Name returns the "v0" name. -func (c *coordinatorZeroT) Name() string { - return "v0" -} - -// Run runs the coordinator for the policy. -func (c *coordinatorZeroT) Run(ctx context.Context) error { - err := c.updatePolicy(c.policy) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to handle policy") - } - - for { - select { - case p := <-c.in: - err = c.updatePolicy(p) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to handle policy") - continue - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -// Update called to signal a new policy revision has been defined. -func (c *coordinatorZeroT) Update(_ context.Context, policy model.Policy) error { - c.in <- policy - return nil -} - -// Output is the output channel for updated coordinated policies. -func (c *coordinatorZeroT) Output() <-chan model.Policy { - return c.out -} - -// updatePolicy performs the working of incrementing the coordinator idx. -func (c *coordinatorZeroT) updatePolicy(p model.Policy) error { - _, err := c.handlePolicy(p.Data) - if err != nil { - return err - } - if p.CoordinatorIdx == 0 { - p.CoordinatorIdx += 1 - c.policy = p - c.out <- p - } - return nil -} - -// handlePolicy performs the actual work of coordination. -// -// Does nothing at the moment. -func (c *coordinatorZeroT) handlePolicy(data *model.PolicyData) (*model.PolicyData, error) { - return data, nil -} diff --git a/internal/pkg/coordinator/v0_test.go b/internal/pkg/coordinator/v0_test.go deleted file mode 100644 index d00ec096ec..0000000000 --- a/internal/pkg/coordinator/v0_test.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !integration - -package coordinator - -import ( - "context" - "testing" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/gofrs/uuid" -) - -func TestCoordinatorZero(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - - policyId := uuid.Must(uuid.NewV4()).String() - policy := model.Policy{ - PolicyID: policyId, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 1, - } - coord, err := NewCoordinatorZero(policy) - if err != nil { - t.Fatal(err) - } - - go func() { - if err := coord.Run(ctx); err != nil && err != context.Canceled { - t.Error(err) - return - } - }() - - // should get a new policy on start up - select { - case newPolicy := <-coord.Output(): - if newPolicy.RevisionIdx != 1 { - t.Fatalf("revision_idx should be set to 1, it was set to %d", newPolicy.RevisionIdx) - } - if newPolicy.CoordinatorIdx != 1 { - t.Fatalf("coordinator_idx should be set to 1, it was set to %d", newPolicy.CoordinatorIdx) - } - case <-time.After(500 * time.Millisecond): - t.Fatal("never receive a new policy") - } - - // send a new policy revision; should get a new policy - policy = model.Policy{ - PolicyID: policyId, - CoordinatorIdx: 0, - Data: nil, - RevisionIdx: 2, - } - if err := coord.Update(ctx, policy); err != nil { - t.Fatal(err) - } - select { - case newPolicy := <-coord.Output(): - if newPolicy.RevisionIdx != 2 { - t.Fatalf("revision_idx should be set to 2, it was set to %d", newPolicy.RevisionIdx) - } - if newPolicy.CoordinatorIdx != 1 { - t.Fatalf("coordinator_idx should be set to 1, it was set to %d", newPolicy.CoordinatorIdx) - } - case <-time.After(500 * time.Millisecond): - t.Fatal("never receive a new policy") - } - - // send policy with already set coordinator_idx, v0 does nothing - policy = model.Policy{ - PolicyID: policyId, - CoordinatorIdx: 1, - Data: nil, - RevisionIdx: 2, - } - if err := coord.Update(ctx, policy); err != nil { - t.Fatal(err) - } - select { - case <-coord.Output(): - t.Fatal("should not have got a new policy") - case <-time.After(500 * time.Millisecond): - break - } -} diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index 703117bcac..185167afe1 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -14,8 +14,6 @@ const ( FleetArtifacts = ".fleet-artifacts" FleetEnrollmentAPIKeys = ".fleet-enrollment-api-keys" FleetPolicies = ".fleet-policies" - FleetPoliciesLeader = ".fleet-policies-leader" - FleetServers = ".fleet-servers" ) // Query fields @@ -30,13 +28,11 @@ const ( FieldActionID = "action_id" FieldAgent = "agent" FieldAgentVersion = "version" - FieldCoordinatorIdx = "coordinator_idx" FieldLastCheckin = "last_checkin" FieldLastCheckinStatus = "last_checkin_status" FieldLastCheckinMessage = "last_checkin_message" FieldLocalMetadata = "local_metadata" FieldComponents = "components" - FieldPolicyCoordinatorIdx = "policy_coordinator_idx" FieldPolicyID = "policy_id" FieldPolicyOutputAPIKey = "api_key" FieldPolicyOutputAPIKeyID = "api_key_id" diff --git a/internal/pkg/dl/migration.go b/internal/pkg/dl/migration.go index 82e20a67b4..30f9c3c39f 100644 --- a/internal/pkg/dl/migration.go +++ b/internal/pkg/dl/migration.go @@ -193,9 +193,9 @@ func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error { // The migration was necessary and indeed run, thus we need to regenerate // the API keys for all agents. In order to do so, we increase the policy - // coordinator index to force a policy update. + // revision index to force a policy update. if migrated > 0 { - _, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) + _, err := migrate(ctx, bulker, migratePolicyRevisionIdx) if err != nil { return fmt.Errorf("v8.5.0 data migration failed: %w", err) } @@ -274,16 +274,17 @@ ctx._source.policy_output_permissions_hash=null; return migrationName, FleetAgents, body, nil } -// migratePolicyCoordinatorIdx increases the policy's CoordinatorIdx to force +// migratePolicyRevisionIdx increases the policy's RevisionIdx to force // a policy update ensuring the output data will be migrated to the new // Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672 // for details. -func migratePolicyCoordinatorIdx() (string, string, []byte, error) { - const migrationName = "PolicyCoordinatorIdx" +// NOTE(michel-laterman): Updated to increment the RevisionIdx instead of the CoordinatorrIdx as the coordinator is being removed. +func migratePolicyRevisionIdx() (string, string, []byte, error) { + const migrationName = "PolicyRevisionIdx" query := dsl.NewRoot() query.Query().MatchAll() - painless := `ctx._source.coordinator_idx++;` + painless := `ctx._source.revision_idx++;` query.Param("script", painless) body, err := query.MarshalJSON() diff --git a/internal/pkg/dl/migration_integration_test.go b/internal/pkg/dl/migration_integration_test.go index 3bd5a20a2f..96d25ec33f 100644 --- a/internal/pkg/dl/migration_integration_test.go +++ b/internal/pkg/dl/migration_integration_test.go @@ -92,7 +92,6 @@ func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, policyModel := model.Policy{ ESDocument: model.ESDocument{}, - CoordinatorIdx: int64(i), Data: &policyData, DefaultFleetServer: false, PolicyID: fmt.Sprint(i), @@ -114,7 +113,7 @@ func createSomePolicies(ctx context.Context, t *testing.T, n int, index string, return created } -func TestPolicyCoordinatorIdx(t *testing.T) { +func TestPolicyRevisionIdx(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() ctx = testlog.SetLogger(t).WithContext(ctx) @@ -123,7 +122,7 @@ func TestPolicyCoordinatorIdx(t *testing.T) { docIDs := createSomePolicies(ctx, t, 25, index, bulker) - migrated, err := migrate(ctx, bulker, migratePolicyCoordinatorIdx) + migrated, err := migrate(ctx, bulker, migratePolicyRevisionIdx) require.NoError(t, err) require.Equal(t, len(docIDs), migrated) @@ -143,7 +142,7 @@ func TestPolicyCoordinatorIdx(t *testing.T) { } } - assert.Equal(t, int64(i+1), got.CoordinatorIdx) + assert.Equal(t, int64(2), got.RevisionIdx, "expected migration to increment revision_idx value by one") } } diff --git a/internal/pkg/dl/policies.go b/internal/pkg/dl/policies.go index 258a0e7336..29dfc67794 100644 --- a/internal/pkg/dl/policies.go +++ b/internal/pkg/dl/policies.go @@ -32,7 +32,6 @@ func prepareQueryLatestPolicies() []byte { revisionIdx.Size(1) rSort := revisionIdx.Sort() rSort.SortOrder(FieldRevisionIdx, dsl.SortDescend) - rSort.SortOrder(FieldCoordinatorIdx, dsl.SortDescend) return root.MustMarshalJSON() } diff --git a/internal/pkg/dl/policies_integration_test.go b/internal/pkg/dl/policies_integration_test.go index ceb4fa9ee3..fecf1b3cf9 100644 --- a/internal/pkg/dl/policies_integration_test.go +++ b/internal/pkg/dl/policies_integration_test.go @@ -36,7 +36,6 @@ func createRandomPolicy(id string, revisionIdx int) model.Policy { return model.Policy{ PolicyID: id, RevisionIdx: int64(revisionIdx), - CoordinatorIdx: 0, Data: &policyData, DefaultFleetServer: false, Timestamp: now.Format(time.RFC3339), diff --git a/internal/pkg/dl/policies_leader.go b/internal/pkg/dl/policies_leader.go deleted file mode 100644 index 3fe83602b3..0000000000 --- a/internal/pkg/dl/policies_leader.go +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package dl - -import ( - "context" - "encoding/json" - "errors" - "sync" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/dsl" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/rs/zerolog" -) - -var ( - tmplSearchPolicyLeaders *dsl.Tmpl - initSearchPolicyLeadersOnce sync.Once -) - -func prepareSearchPolicyLeaders() (*dsl.Tmpl, error) { - tmpl := dsl.NewTmpl() - root := dsl.NewRoot() - root.Query().Terms(FieldID, tmpl.Bind(FieldID), nil) - - err := tmpl.Resolve(root) - if err != nil { - return nil, err - } - return tmpl, nil -} - -// SearchPolicyLeaders returns all the leaders for the provided policies -func SearchPolicyLeaders(ctx context.Context, bulker bulk.Bulk, ids []string, opt ...Option) (leaders map[string]model.PolicyLeader, err error) { - initSearchPolicyLeadersOnce.Do(func() { - tmplSearchPolicyLeaders, err = prepareSearchPolicyLeaders() - if err != nil { - return - } - }) - - o := newOption(FleetPoliciesLeader, opt...) - data, err := tmplSearchPolicyLeaders.RenderOne(FieldID, ids) - if err != nil { - return - } - res, err := bulker.Search(ctx, o.indexName, data) - if err != nil { - if errors.Is(err, es.ErrIndexNotFound) { - zerolog.Ctx(ctx).Debug().Str("index", o.indexName).Msg(es.ErrIndexNotFound.Error()) - err = nil - } - return - } - - leaders = map[string]model.PolicyLeader{} - for _, hit := range res.Hits { - var l model.PolicyLeader - err = hit.Unmarshal(&l) - if err != nil { - return - } - leaders[hit.ID] = l - } - return leaders, nil -} - -// TakePolicyLeadership tries to take leadership of a policy -func TakePolicyLeadership(ctx context.Context, bulker bulk.Bulk, policyID, serverID, version string, opt ...Option) error { - o := newOption(FleetPoliciesLeader, opt...) - data, err := bulker.Read(ctx, o.indexName, policyID, bulk.WithRefresh()) - if err != nil && !errors.Is(err, es.ErrElasticNotFound) { - return err - } - var l model.PolicyLeader - found := false - if !errors.Is(err, es.ErrElasticNotFound) { - found = true - err = json.Unmarshal(data, &l) - if err != nil { - return err - } - } - if l.Server == nil { - l.Server = &model.ServerMetadata{} - } - l.Server.ID = serverID - l.Server.Version = version - l.SetTime(time.Now().UTC()) - if found { - data, err = json.Marshal(&struct { - Doc model.PolicyLeader `json:"doc"` - }{ - Doc: l, - }) - if err != nil { - return err - } - err = bulker.Update(ctx, o.indexName, policyID, data, bulk.WithRefresh()) - } else { - data, err = json.Marshal(&l) - if err != nil { - return err - } - _, err = bulker.Create(ctx, o.indexName, policyID, data, bulk.WithRefresh()) - } - if err != nil { - return err - } - return nil -} - -// ReleasePolicyLeadership releases leadership of a policy -func ReleasePolicyLeadership(ctx context.Context, bulker bulk.Bulk, policyID, serverID string, releaseInterval time.Duration, opt ...Option) error { - o := newOption(FleetPoliciesLeader, opt...) - data, err := bulker.Read(ctx, o.indexName, policyID, bulk.WithRefresh()) - if errors.Is(err, es.ErrElasticNotFound) { - // nothing to do - return nil - } - if err != nil { - return err - } - var l model.PolicyLeader - err = json.Unmarshal(data, &l) - if err != nil { - return err - } - if l.Server.ID != serverID { - // not leader anymore; nothing to do - return nil - } - released := time.Now().UTC().Add(-releaseInterval) - l.SetTime(released) - data, err = json.Marshal(&struct { - Doc model.PolicyLeader `json:"doc"` - }{ - Doc: l, - }) - if err != nil { - return err - } - err = bulker.Update(ctx, o.indexName, policyID, data, bulk.WithRefresh()) - if errors.Is(err, es.ErrElasticVersionConflict) { - // another leader took over; nothing to worry about - return nil - } - return err -} diff --git a/internal/pkg/dl/policies_leader_integration_test.go b/internal/pkg/dl/policies_leader_integration_test.go deleted file mode 100644 index 73194b6c33..0000000000 --- a/internal/pkg/dl/policies_leader_integration_test.go +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package dl - -import ( - "context" - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" - testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - - "github.com/gofrs/uuid" -) - -const testVer = "1.0.0" - -func TestSearchPolicyLeaders(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - // insert a policy leaders to search for - serverID := uuid.Must(uuid.NewV4()).String() - policyIds := make([]string, 3) - for i := 0; i < 3; i++ { - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - policyIds[i] = policyID - } - - // possible that a search will not produce 3 directly after write - // so we try 3 times to ensure - ftesting.Retry(t, ctx, func(ctx context.Context) error { - leaders, err := SearchPolicyLeaders(ctx, bulker, policyIds, WithIndexName(index)) - if err != nil { - return err - } - if len(leaders) != 3 { - return fmt.Errorf("must have found 3 leaders: only found %v", len(leaders)) - } - return nil - }, ftesting.RetryCount(3)) -} - -func TestTakePolicyLeadership(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - serverID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - var leader model.PolicyLeader - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != serverID || leader.Server.Version != version { - t.Fatal("server.id and server.version should match") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - if time.Now().UTC().Sub(lt) >= 5*time.Second { - t.Fatal("@timestamp should be with in 5 seconds") - } -} - -func TestReleasePolicyLeadership(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - serverID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - err = ReleasePolicyLeadership(ctx, bulker, policyID, serverID, 30*time.Second, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - var leader model.PolicyLeader - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != serverID || leader.Server.Version != version { - t.Fatal("server.id and server.version should match") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - diff := time.Now().UTC().Sub(lt).Seconds() - if diff < (30 * time.Second).Seconds() { - t.Fatalf("@timestamp different should be more than 30 seconds; instead its %.0f secs", diff) - } -} - -func TestReleasePolicyLeadership_NothingIfNotLeader(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPoliciesLeader) - - serverID := uuid.Must(uuid.NewV4()).String() - policyID := uuid.Must(uuid.NewV4()).String() - version := testVer - err := TakePolicyLeadership(ctx, bulker, policyID, serverID, version, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - otherServerID := uuid.Must(uuid.NewV4()).String() - err = ReleasePolicyLeadership(ctx, bulker, policyID, otherServerID, 30*time.Second, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - data, err := bulker.Read(ctx, index, policyID) - if err != nil { - t.Fatal(err) - } - var leader model.PolicyLeader - err = json.Unmarshal(data, &leader) - if err != nil { - t.Fatal(err) - } - if leader.Server.ID != serverID || leader.Server.Version != version { - t.Fatal("server.id and server.version should match") - } - lt, err := leader.Time() - if err != nil { - t.Fatal(err) - } - diff := time.Now().UTC().Sub(lt).Seconds() - if diff >= (5 * time.Second).Seconds() { - t.Fatalf("@timestamp different should less than 5 seconds; instead its %.0f secs", diff) - } -} diff --git a/internal/pkg/dl/servers.go b/internal/pkg/dl/servers.go deleted file mode 100644 index 2686bf5609..0000000000 --- a/internal/pkg/dl/servers.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package dl - -import ( - "context" - "encoding/json" - "errors" - "time" - - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" - "github.com/elastic/fleet-server/v7/internal/pkg/es" - "github.com/elastic/fleet-server/v7/internal/pkg/model" -) - -// EnsureServer ensures that this server is written in the index. -func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent model.AgentMetadata, host model.HostMetadata, opts ...Option) error { - var server model.Server - o := newOption(FleetServers, opts...) - data, err := bulker.Read(ctx, o.indexName, agent.ID) - if err != nil && !errors.Is(err, es.ErrElasticNotFound) { - return err - } - if errors.Is(err, es.ErrElasticNotFound) { - server.Agent = &agent - server.Host = &host - server.Server = &model.ServerMetadata{ - ID: agent.ID, - Version: version, - } - server.SetTime(time.Now().UTC()) - data, err = json.Marshal(&server) - if err != nil { - return err - } - _, err = bulker.Create(ctx, o.indexName, agent.ID, data) - return err - } - err = json.Unmarshal(data, &server) - if err != nil { - return err - } - server.Agent = &agent - server.Host = &host - server.Server = &model.ServerMetadata{ - ID: agent.ID, - Version: version, - } - server.SetTime(time.Now().UTC()) - data, err = json.Marshal(&struct { - Doc model.Server `json:"doc"` - }{server}) - if err != nil { - return err - } - return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) -} diff --git a/internal/pkg/dl/servers_integration_test.go b/internal/pkg/dl/servers_integration_test.go deleted file mode 100644 index da25234807..0000000000 --- a/internal/pkg/dl/servers_integration_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package dl - -import ( - "context" - "encoding/json" - "runtime" - "testing" - - "github.com/gofrs/uuid" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" - testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" -) - -func TestEnsureServer(t *testing.T) { - ctx, cn := context.WithCancel(context.Background()) - defer cn() - ctx = testlog.SetLogger(t).WithContext(ctx) - - index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetServers) - - agentID := uuid.Must(uuid.NewV4()).String() - agent := model.AgentMetadata{ - ID: agentID, - Version: "1.0.0", - } - host := model.HostMetadata{ - Architecture: runtime.GOOS, - ID: agentID, - Ip: []string{"::1"}, - Name: "testing-host", - } - - err := EnsureServer(ctx, bulker, "1.0.0", agent, host, WithIndexName(index)) - if err != nil { - t.Fatal(err) - } - - var srv model.Server - data, err := bulker.Read(ctx, index, agentID) - if err != nil { - t.Fatal(err) - } - err = json.Unmarshal(data, &srv) - if err != nil { - t.Fatal(err) - } - if srv.Agent.ID != agentID { - t.Fatal("agent.id should match agentId") - } -} diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index ecb71f4aa4..fa5444d0a2 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -347,7 +347,7 @@ type Policy struct { ESDocument // The coordinator index of the policy - CoordinatorIdx int64 `json:"coordinator_idx"` + CoordinatorIdx int64 `json:"coordinator_idx,omitempty"` Data *PolicyData `json:"data"` // True when this policy is the default policy to start Fleet Server diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 187b8ab67f..b89f19230b 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -56,7 +56,7 @@ type Monitor interface { Run(ctx context.Context) error // Subscribe creates a new subscription for a policy update. - Subscribe(agentID string, policyID string, revisionIdx int64, coordinatorIdx int64) (Subscription, error) + Subscribe(agentID string, policyID string, revisionIdx int64) (Subscription, error) // Unsubscribe removes the current subscription. Unsubscribe(sub Subscription) error @@ -230,7 +230,6 @@ func (m *monitorT) dispatchPending() bool { Str(logger.AgentID, s.agentID). Str(logger.PolicyID, s.policyID). Int64("rev", s.revIdx). - Int64("coord", s.coordIdx). Msg("dispatch") default: // Should never block on a channel; we created a channel of size one. @@ -294,9 +293,6 @@ func groupByLatest(policies []model.Policy) map[string]model.Policy { } if policy.RevisionIdx > curr.RevisionIdx { latest[policy.PolicyID] = policy - continue - } else if policy.RevisionIdx == curr.RevisionIdx && policy.CoordinatorIdx > curr.CoordinatorIdx { - latest[policy.PolicyID] = policy } } return latest @@ -312,14 +308,8 @@ func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool { zlog := m.log.With(). Str(logger.PolicyID, newPolicy.PolicyID). Int64("rev", newPolicy.RevisionIdx). - Int64("coord", newPolicy.CoordinatorIdx). Logger() - if newPolicy.CoordinatorIdx <= 0 { - zlog.Info().Str(logger.PolicyID, newPolicy.PolicyID).Msg("Ignore policy that has not passed through coordinator") - return false - } - m.mut.Lock() defer m.mut.Unlock() @@ -371,7 +361,6 @@ func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool { zlog.Info(). Int64("oldRev", oldPolicy.RevisionIdx). - Int64("oldCoord", oldPolicy.CoordinatorIdx). Int("nQueued", nQueued). Str(logger.PolicyID, newPolicy.PolicyID). Msg("New revision of policy received and added to the queue") @@ -380,7 +369,6 @@ func (m *monitorT) updatePolicy(pp *ParsedPolicy) bool { } func (m *monitorT) kickLoad() { - select { case m.kickCh <- struct{}{}: default: @@ -389,7 +377,6 @@ func (m *monitorT) kickLoad() { } func (m *monitorT) kickDeploy() { - select { case m.deployCh <- struct{}{}: default: @@ -397,26 +384,20 @@ func (m *monitorT) kickDeploy() { } // Subscribe creates a new subscription for a policy update. -func (m *monitorT) Subscribe(agentID string, policyID string, revisionIdx int64, coordinatorIdx int64) (Subscription, error) { +func (m *monitorT) Subscribe(agentID string, policyID string, revisionIdx int64) (Subscription, error) { if revisionIdx < 0 { return nil, errors.New("revisionIdx must be greater than or equal to 0") } - if coordinatorIdx < 0 { - return nil, errors.New("coordinatorIdx must be greater than or equal to 0") - } - m.log.Debug(). Str(logger.AgentID, agentID). Str(logger.PolicyID, policyID). Int64("rev", revisionIdx). - Int64("coord", coordinatorIdx). Msg("subscribed to policy monitor") s := NewSub( policyID, agentID, revisionIdx, - coordinatorIdx, ) m.mut.Lock() @@ -464,7 +445,6 @@ func (m *monitorT) Unsubscribe(sub Subscription) error { Str(logger.AgentID, s.agentID). Str(logger.PolicyID, s.policyID). Int64("rev", s.revIdx). - Int64("coord", s.coordIdx). Msg("unsubscribe") return nil diff --git a/internal/pkg/policy/monitor_integration_test.go b/internal/pkg/policy/monitor_integration_test.go index 380dc5b1d3..801b62928c 100644 --- a/internal/pkg/policy/monitor_integration_test.go +++ b/internal/pkg/policy/monitor_integration_test.go @@ -77,17 +77,16 @@ func TestMonitor_Integration(t *testing.T) { agentID := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() - s, err := m.Subscribe(agentID, policyID, 0, 0) + s, err := m.Subscribe(agentID, policyID, 0) defer m.Unsubscribe(s) //nolint:errcheck // defered function if err != nil { t.Fatal(err) } policy := model.Policy{ - PolicyID: policyID, - CoordinatorIdx: 1, - Data: &intPolData, - RevisionIdx: 1, + PolicyID: policyID, + Data: &intPolData, + RevisionIdx: 1, } ch := make(chan error, 1) go func() { @@ -102,7 +101,7 @@ func TestMonitor_Integration(t *testing.T) { select { case subPolicy := <-s.Output(): tm.Stop() - if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 && subPolicy.Policy.CoordinatorIdx != 1 { + if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 { t.Fatal("failed to get the expected updated policy") } case <-tm.C: diff --git a/internal/pkg/policy/monitor_test.go b/internal/pkg/policy/monitor_test.go index db3b68a7e0..a34c777918 100644 --- a/internal/pkg/policy/monitor_test.go +++ b/internal/pkg/policy/monitor_test.go @@ -69,7 +69,7 @@ func TestMonitor_NewPolicy(t *testing.T) { agentId := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() - s, err := monitor.Subscribe(agentId, policyID, 0, 0) + s, err := monitor.Subscribe(agentId, policyID, 0) defer monitor.Unsubscribe(s) require.NoError(t, err) @@ -80,10 +80,9 @@ func TestMonitor_NewPolicy(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyID, - CoordinatorIdx: 1, - Data: policyDataDefault, - RevisionIdx: 1, + PolicyID: policyID, + Data: policyDataDefault, + RevisionIdx: 1, } policyData, err := json.Marshal(&policy) require.NoError(t, err) @@ -149,7 +148,7 @@ func TestMonitor_SamePolicy(t *testing.T) { agentId := uuid.Must(uuid.NewV4()).String() policyId := uuid.Must(uuid.NewV4()).String() - s, err := monitor.Subscribe(agentId, policyId, 1, 1) + s, err := monitor.Subscribe(agentId, policyId, 1) defer monitor.Unsubscribe(s) require.NoError(t, err) @@ -160,10 +159,9 @@ func TestMonitor_SamePolicy(t *testing.T) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, - CoordinatorIdx: 1, - Data: policyDataDefault, - RevisionIdx: 1, + PolicyID: policyId, + Data: policyDataDefault, + RevisionIdx: 1, } policyData, err := json.Marshal(&policy) require.NoError(t, err) @@ -189,85 +187,7 @@ func TestMonitor_SamePolicy(t *testing.T) { if merr != nil && merr != context.Canceled { t.Fatal(merr) } - require.False(t, gotPolicy, "got policy update when it was the same rev/coord idx") - ms.AssertExpectations(t) - mm.AssertExpectations(t) -} - -func TestMonitor_NewPolicyUncoordinated(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx = testlog.SetLogger(t).WithContext(ctx) - - chHitT := make(chan []es.HitT, 1) - defer close(chHitT) - ms := mmock.NewMockSubscription() - ms.On("Output").Return((<-chan []es.HitT)(chHitT)) - mm := mmock.NewMockMonitor() - mm.On("Subscribe").Return(ms).Once() - mm.On("Unsubscribe", mock.Anything).Return().Once() - bulker := ftesting.NewMockBulk() - - monitor := NewMonitor(bulker, mm, 0) - pm := monitor.(*monitorT) - pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) { - return []model.Policy{}, nil - } - - var merr error - var mwg sync.WaitGroup - mwg.Add(1) - go func() { - defer mwg.Done() - merr = monitor.Run(ctx) - }() - - err := monitor.(*monitorT).waitStart(ctx) - require.NoError(t, err) - - agentId := uuid.Must(uuid.NewV4()).String() - policyId := uuid.Must(uuid.NewV4()).String() - s, err := monitor.Subscribe(agentId, policyId, 1, 1) - defer monitor.Unsubscribe(s) - require.NoError(t, err) - - rId := xid.New().String() - policy := model.Policy{ - ESDocument: model.ESDocument{ - Id: rId, - Version: 1, - SeqNo: 1, - }, - PolicyID: policyId, - CoordinatorIdx: 0, - Data: policyDataDefault, - RevisionIdx: 2, - } - policyData, err := json.Marshal(&policy) - require.NoError(t, err) - - chHitT <- []es.HitT{{ - ID: rId, - SeqNo: 1, - Version: 1, - Source: policyData, - }} - - gotPolicy := false - tm := time.NewTimer(1 * time.Second) - defer tm.Stop() - select { - case <-s.Output(): - gotPolicy = true - case <-tm.C: - } - - cancel() - mwg.Wait() - if merr != nil && merr != context.Canceled { - t.Fatal(merr) - } - require.False(t, gotPolicy, "got policy update when it had coordinator_idx set to 0") + require.False(t, gotPolicy, "got policy update when it was the same rev idx") ms.AssertExpectations(t) mm.AssertExpectations(t) } @@ -318,10 +238,9 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { Version: 1, SeqNo: 1, }, - PolicyID: policyId, - CoordinatorIdx: 1, - Data: policyDataDefault, - RevisionIdx: 2, + PolicyID: policyId, + Data: policyDataDefault, + RevisionIdx: 2, } pm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) { @@ -340,7 +259,7 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { err := monitor.(*monitorT).waitStart(ctx) require.NoError(t, err) - s, err := monitor.Subscribe(agentId, policyId, 1, 1) + s, err := monitor.Subscribe(agentId, policyId, 1) defer monitor.Unsubscribe(s) require.NoError(t, err) diff --git a/internal/pkg/policy/revision.go b/internal/pkg/policy/revision.go index 68d6768a79..09a93dc126 100644 --- a/internal/pkg/policy/revision.go +++ b/internal/pkg/policy/revision.go @@ -14,24 +14,25 @@ import ( // Revision is a policy revision that is sent as an action ID to an agent. type Revision struct { - PolicyID string - RevisionIdx int64 - CoordinatorIdx int64 + PolicyID string + RevisionIdx int64 } // RevisionFromPolicy creates the revision from the policy. func RevisionFromPolicy(policy model.Policy) Revision { return Revision{ - PolicyID: policy.PolicyID, - RevisionIdx: policy.RevisionIdx, - CoordinatorIdx: policy.CoordinatorIdx, + PolicyID: policy.PolicyID, + RevisionIdx: policy.RevisionIdx, } } // RevisionFromString converts the string to a policy revision. func RevisionFromString(actionID string) (Revision, bool) { split := strings.Split(actionID, ":") - if len(split) != 4 { + // NOTE: len 3 is expected for any policy change generated by fleet server v8.13+ + // len 4 includes the previously used coordinator_idx value that has been deprecated. + // If we recieve a actionID with the coordinator_idx ignore the coordinator_idx value. + if len(split) < 3 || len(split) > 4 { return Revision{}, false } if split[0] != "policy" { @@ -41,18 +42,13 @@ func RevisionFromString(actionID string) (Revision, bool) { if err != nil { return Revision{}, false } - coordIdx, err := strconv.ParseInt(split[3], 10, 64) - if err != nil { - return Revision{}, false - } return Revision{ - PolicyID: split[1], - RevisionIdx: revIdx, - CoordinatorIdx: coordIdx, + PolicyID: split[1], + RevisionIdx: revIdx, }, true } // String returns the ID string for the policy revision. func (a *Revision) String() string { - return fmt.Sprintf("policy:%s:%d:%d", a.PolicyID, a.RevisionIdx, a.CoordinatorIdx) + return fmt.Sprintf("policy:%s:%d", a.PolicyID, a.RevisionIdx) } diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index dd1f4a865f..c59714487a 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -90,7 +90,6 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, @@ -131,7 +130,6 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 2, DefaultFleetServer: true, @@ -247,7 +245,6 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, @@ -387,7 +384,6 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 2, DefaultFleetServer: true, @@ -428,7 +424,6 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { SeqNo: 2, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, @@ -543,7 +538,6 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { SeqNo: 1, }, PolicyID: policyID, - CoordinatorIdx: 1, Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, diff --git a/internal/pkg/policy/sub.go b/internal/pkg/policy/sub.go index 86c60a5ce6..e767c35f29 100644 --- a/internal/pkg/policy/sub.go +++ b/internal/pkg/policy/sub.go @@ -12,7 +12,6 @@ type subT struct { policyID string agentID string // not logically necessary; cached for logging revIdx int64 - coordIdx int64 next *subT prev *subT @@ -20,12 +19,11 @@ type subT struct { ch chan *ParsedPolicy } -func NewSub(policyID, agentID string, revIdx, coordIdx int64) *subT { +func NewSub(policyID, agentID string, revIdx int64) *subT { return &subT{ policyID: policyID, agentID: agentID, revIdx: revIdx, - coordIdx: coordIdx, ch: make(chan *ParsedPolicy, 1), } } @@ -77,11 +75,9 @@ func (n *subT) isEmpty() bool { } func (n *subT) isUpdate(policy *model.Policy) bool { - pRevIdx := policy.RevisionIdx - pCoordIdx := policy.CoordinatorIdx - return (pRevIdx > n.revIdx && pCoordIdx > 0) || (pRevIdx == n.revIdx && pCoordIdx > n.coordIdx) + return pRevIdx > n.revIdx } // Output returns a new policy that needs to be sent based on the current subscription. diff --git a/internal/pkg/policy/sub_test.go b/internal/pkg/policy/sub_test.go index d756df964e..40e70ba137 100644 --- a/internal/pkg/policy/sub_test.go +++ b/internal/pkg/policy/sub_test.go @@ -35,7 +35,7 @@ func TestSub_PushBackN(t *testing.T) { nodes := make([]*subT, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) head.pushBack(nn) nodes = append(nodes, nn) } @@ -82,7 +82,7 @@ func TestSub_PushFrontN(t *testing.T) { nodes := make([]*subT, 0, n) for i := 0; i < n; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) head.pushFront(nn) nodes = append(nodes, nn) } @@ -126,7 +126,7 @@ func TestSub_PushRandom(t *testing.T) { nodes := make([]*subT, 0, N) for i := 0; i < N; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) if rand.Intn(2) == 1 { head.pushBack(nn) @@ -162,7 +162,7 @@ func TestSub_UnlinkRandomN(t *testing.T) { nodes := make([]*subT, 0, N) for i := 0; i < N; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) head.pushBack(nn) nodes = append(nodes, nn) } @@ -195,7 +195,7 @@ func TestSub_UnlinkRandomN(t *testing.T) { func BenchmarkSubsSimple(b *testing.B) { head := makeHead() - nn := NewSub("", "", 0, 0) + nn := NewSub("", "", 0) for i := 0; i < b.N; i++ { head.pushBack(nn) head.popFront() @@ -219,7 +219,7 @@ func BenchmarkSubs(b *testing.B) { for i := 0; i < max; i++ { name := fmt.Sprintf("policy%d", i) - nn := NewSub(name, "", 0, 0) + nn := NewSub(name, "", 0) subs = append(subs, nn) } diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 95b08b9b7e..066f7577b5 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/coordinator" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/gc" @@ -464,7 +463,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro return err } - // Coordinator policy monitor + // Policy index monitor pim, err := monitor.New(dl.FleetPolicies, esCli, monCli, monitor.WithFetchSize(cfg.Inputs[0].Monitor.FetchSize), monitor.WithPollTimeout(cfg.Inputs[0].Monitor.PollTimeout), @@ -473,10 +472,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro if err != nil { return err } - g.Go(loggedRunFunc(ctx, "Policy index monitor", pim.Run)) - cord := coordinator.NewMonitor(cfg.Fleet, f.bi.Version, bulker, pim, coordinator.NewCoordinatorZero) - g.Go(loggedRunFunc(ctx, "Coordinator policy monitor", cord.Run)) // Policy monitor pm := policy.NewMonitor(bulker, pim, cfg.Inputs[0].Server.Limits.PolicyThrottle) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index b43979e8a2..255717bda8 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -547,7 +547,7 @@ func TestServerInstrumentation(t *testing.T) { } } - // Force a transaction (fleet-server should be sending tranactions from the coordinator and monitor) + // Force a transaction (fleet-server should be sending tranactions from monitor) callCheckinFunc() // Verify the APM tracer does not connect to the mocked APM Server. diff --git a/model/schema.json b/model/schema.json index 9857da5120..41865c21ee 100644 --- a/model/schema.json +++ b/model/schema.json @@ -332,6 +332,7 @@ "type": "integer" }, "coordinator_idx": { + "deprecated": true, "description": "The coordinator index of the policy", "type": "integer" }, @@ -350,7 +351,6 @@ "required": [ "policy_id", "revision_idx", - "coordinator_idx", "data", "default_fleet_server" ] @@ -524,6 +524,7 @@ "type": "integer" }, "policy_coordinator_idx": { + "deprecated": true, "description": "The current policy coordinator for the Elastic Agent", "type": "integer" },