Skip to content

Commit

Permalink
Change how output keys are regenerated for v8.5 migration
Browse files Browse the repository at this point in the history
The v8.5 migration generated new output keys for an agent by forcing the
policy outputs to be prepared by incrementing the coordinator_idx for
the policy. Behaviour was changed instead to detect if a single output
has no api_key value (empty string) and subscribe with a revision of 0.
  • Loading branch information
michel-laterman committed Apr 17, 2024
1 parent 9f3a479 commit af2552a
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 104 deletions.
12 changes: 11 additions & 1 deletion internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,18 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer ct.ad.Unsubscribe(aSub)
actCh := aSub.Ch()

// use revision_idx=0 if the agent has a single output where no API key is defined
// This will force the policy monitor to emit a new policy to regerate API keys
revID := agent.PolicyRevisionIdx
for _, output := range agent.Outputs {
if output.APIKey == "" {
revID = 0
break
}
}

// Subscribe to policy manager for changes on PolicyId > policyRev
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, agent.PolicyRevisionIdx)
sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID)
if err != nil {
return fmt.Errorf("subscribe policy monitor: %w", err)
}
Expand Down
41 changes: 6 additions & 35 deletions internal/pkg/dl/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,15 @@ func migrateAgentMetadata() (string, string, []byte, error) {

func migrateToV8_5(ctx context.Context, bulker bulk.Bulk) error {
zerolog.Ctx(ctx).Debug().Msg("applying migration to v8.5.0")
migrated, err := migrate(ctx, bulker, migrateAgentOutputs)
_, err := migrate(ctx, bulker, migrateAgentOutputs)
if err != nil {
return fmt.Errorf("v8.5.0 data migration failed: %w", err)
}

// The migration was necessary and indeed run, thus we need to regenerate
// the API keys for all agents. In order to do so, we increase the policy
// revision index to force a policy update.
if migrated > 0 {
_, err := migrate(ctx, bulker, migratePolicyRevisionIdx)
if err != nil {
return fmt.Errorf("v8.5.0 data migration failed: %w", err)
}
}

// NOTE(michel-laterman): We use lazy output migration now that the
// coordinator is removed. Migration completes once an agent checks in
// and an API key associated with an output is empty. The agent will
// subscribe to the policy monitor with revision_idx=0 to force a policy
// change to occur.
return nil
}

Expand Down Expand Up @@ -273,26 +267,3 @@ ctx._source.policy_output_permissions_hash=null;

return migrationName, FleetAgents, body, nil
}

// migratePolicyRevisionIdx increases the policy's RevisionIdx to force
// a policy update ensuring the output data will be migrated to the new
// Agent.Outputs field. See migrateAgentOutputs and https://github.com/elastic/fleet-server/issues/1672
// for details.
// NOTE(michel-laterman): Updated to increment the RevisionIdx instead of the CoordinatorrIdx as the coordinator is being removed.
func migratePolicyRevisionIdx() (string, string, []byte, error) {
const migrationName = "PolicyRevisionIdx"

query := dsl.NewRoot()
query.Query().MatchAll()
painless := `ctx._source.revision_idx++;`
query.Param("script", painless)

body, err := query.MarshalJSON()
if err != nil {
zerolog.Ctx(context.TODO()).Debug().Str("painlessScript", painless).
Msgf("%s: failed painless script", migrationName)
return migrationName, FleetPolicies, nil, fmt.Errorf("could not marshal ES query: %w", err)
}

return migrationName, FleetPolicies, body, nil
}
33 changes: 0 additions & 33 deletions internal/pkg/dl/migration_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,39 +114,6 @@ func createSomePolicies(ctx context.Context, t *testing.T, n int, index string,
return created
}

func TestPolicyRevisionIdx(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
ctx = testlog.SetLogger(t).WithContext(ctx)

index, bulker := ftesting.SetupCleanIndex(ctx, t, FleetPolicies)

docIDs := createSomePolicies(ctx, t, 25, index, bulker)

migrated, err := migrate(ctx, bulker, migratePolicyRevisionIdx)
require.NoError(t, err)

require.Equal(t, len(docIDs), migrated)

for i := range docIDs {
policies, err := QueryLatestPolicies(
ctx, bulker, WithIndexName(index))
if err != nil {
assert.NoError(t, err, "failed to query latest policies") // we want to continue even if a single agent fails
continue
}

var got model.Policy
for _, p := range policies {
if p.PolicyID == fmt.Sprint(i) {
got = p
}
}

assert.Equal(t, int64(2), got.RevisionIdx, "expected migration to increment revision_idx value by one")
}
}

func TestMigrateOutputs_withDefaultAPIKeyHistory(t *testing.T) {
ctx, cn := context.WithCancel(context.Background())
defer cn()
Expand Down
48 changes: 22 additions & 26 deletions internal/pkg/policy/monitor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,16 @@ func TestMonitor_Debounce_Integration(t *testing.T) {

agentID := uuid.Must(uuid.NewV4()).String()
policyID := uuid.Must(uuid.NewV4()).String()
s, err := m.Subscribe(agentID, policyID, 0, 0)
s, err := m.Subscribe(agentID, policyID, 0)
if err != nil {
t.Fatal(err)
}
defer m.Unsubscribe(s) //nolint:errcheck // defered function

policy := model.Policy{
PolicyID: policyID,
CoordinatorIdx: 1,
Data: &intPolData,
RevisionIdx: 1,
PolicyID: policyID,
Data: &intPolData,
RevisionIdx: 1,
}
ch := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -245,7 +244,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) {
ts = time.Now()
tm.Stop()
t.Log("received initial policy from subsciption")
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 && subPolicy.Policy.CoordinatorIdx != 1 {
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 1 {
t.Fatal("failed to get the expected updated policy")
}
case <-tm.C:
Expand All @@ -257,7 +256,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) {
}

// Make new subscription to replicate agent checking in again.
s2, err := m.Subscribe(agentID, policyID, 1, 1)
s2, err := m.Subscribe(agentID, policyID, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -274,15 +273,15 @@ func TestMonitor_Debounce_Integration(t *testing.T) {
t.Fatalf("Expected subscription to take at least 1s to update, time was: %s", dur)
}
// 2nd version of policy should be skipped, 3rd should be read.
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 && subPolicy.Policy.CoordinatorIdx != 1 {
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 {
t.Fatal("failed to get the expected updated policy")
}
case <-tm.C:
timedout = true

}

s3, err := m.Subscribe(agentID, policyID, 3, 1)
s3, err := m.Subscribe(agentID, policyID, 3)
if err != nil {
t.Fatal(err)
}
Expand All @@ -294,7 +293,7 @@ func TestMonitor_Debounce_Integration(t *testing.T) {
tm.Stop()
t.Logf("received third policy from subsciption, rev %d", subPolicy.Policy.RevisionIdx)
// 2nd version of policy should be skipped, 3rd should be read.
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 4 && subPolicy.Policy.CoordinatorIdx != 1 {
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 4 {
t.Fatal("failed to get the expected updated policy")
}
case <-tm.C:
Expand Down Expand Up @@ -369,10 +368,9 @@ func TestMonitor_Revisions(t *testing.T) {
policyID := uuid.Must(uuid.NewV4()).String()

policy := model.Policy{
PolicyID: policyID,
CoordinatorIdx: 1,
Data: &intPolData,
RevisionIdx: 1,
PolicyID: policyID,
Data: &intPolData,
RevisionIdx: 1,
}
_, err = dl.CreatePolicy(ctx, bulker, policy, dl.WithIndexName(index))
if err != nil {
Expand All @@ -385,14 +383,14 @@ func TestMonitor_Revisions(t *testing.T) {
t.Fatal(err)
}

s, err := m.Subscribe(agentID, policyID, 1, 1)
s, err := m.Subscribe(agentID, policyID, 1)
if err != nil {
t.Fatal(err)
}
defer m.Unsubscribe(s) //nolint:errcheck // defered function

agent2 := uuid.Must(uuid.NewV4()).String()
s2, err := m.Subscribe(agent2, policyID, 1, 1)
s2, err := m.Subscribe(agent2, policyID, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -406,7 +404,6 @@ func TestMonitor_Revisions(t *testing.T) {

// policy should be ignored as coordinator_idx is 0
policy.RevisionIdx = 4
policy.CoordinatorIdx = 0
_, err = dl.CreatePolicy(ctx, bulker, policy, dl.WithIndexName(index))
if err != nil {
t.Fatal(err)
Expand All @@ -416,7 +413,7 @@ func TestMonitor_Revisions(t *testing.T) {
select {
case subPolicy := <-s.Output():
tm.Stop()
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 && subPolicy.Policy.CoordinatorIdx != 1 {
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 {
t.Fatalf("failed to get the expected updated policy, policy revision: %d", subPolicy.Policy.RevisionIdx)
}
case <-tm.C:
Expand All @@ -427,7 +424,7 @@ func TestMonitor_Revisions(t *testing.T) {
select {
case subPolicy := <-s2.Output():
tm.Stop()
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 && subPolicy.Policy.CoordinatorIdx != 1 {
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 3 {
t.Fatalf("failed to get the expected updated policy, policy revision: %d", subPolicy.Policy.RevisionIdx)
}
case <-tm.C:
Expand Down Expand Up @@ -493,10 +490,9 @@ func TestMonitor_KickDeploy(t *testing.T) {
policyID := uuid.Must(uuid.NewV4()).String()

policy := model.Policy{
PolicyID: policyID,
CoordinatorIdx: 1,
Data: &intPolData,
RevisionIdx: 1,
PolicyID: policyID,
Data: &intPolData,
RevisionIdx: 1,
}
_, err = dl.CreatePolicy(ctx, bulker, policy, dl.WithIndexName(index))
if err != nil {
Expand All @@ -509,14 +505,14 @@ func TestMonitor_KickDeploy(t *testing.T) {
t.Fatal(err)
}

s, err := m.Subscribe(agentID, policyID, 1, 1)
s, err := m.Subscribe(agentID, policyID, 1)
if err != nil {
t.Fatal(err)
}
defer m.Unsubscribe(s) //nolint:errcheck // defered function

// Force a new policy load so that the kickLoad() func runs
s2, err := m.Subscribe("test", "test", 1, 1)
s2, err := m.Subscribe("test", "test", 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -526,7 +522,7 @@ func TestMonitor_KickDeploy(t *testing.T) {
select {
case subPolicy := <-s.Output():
tm.Stop()
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 2 && subPolicy.Policy.CoordinatorIdx != 1 {
if subPolicy.Policy.PolicyID != policyID && subPolicy.Policy.RevisionIdx != 2 {
t.Fatalf("failed to get the expected updated policy, policy revision: %d", subPolicy.Policy.RevisionIdx)
}
case <-tm.C:
Expand Down
Loading

0 comments on commit af2552a

Please sign in to comment.