From b51bf97530f988882ba3886be7919003042a9d56 Mon Sep 17 00:00:00 2001 From: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Date: Mon, 17 Oct 2022 12:14:59 -0700 Subject: [PATCH] Don't send POLICY_CHANGE actions retrieved from index to agent. (#1963) * Don't send POLICY_CHANGE actions retrieved from index to agent. The fleet-server should not send any policy change actions that are written to the actions index to an agent on checkin. The server will remove these actions in the convert method and emit a warning message. The ack token that is used is not altered in this case. Policy change actions are dynamically generated by the fleet-server when it detects that the agent is not running an up to date version of the policy. * move filtering to its own method * Fix linter, tests, fix file name --- CHANGELOG.next.asciidoc | 3 +- internal/pkg/api/handleCheckin.go | 18 +++++ internal/pkg/api/handleCheckin_test.go | 103 ++++++++++++++++++++++++ internal/pkg/api/handleChecking_test.go | 39 --------- 4 files changed, 123 insertions(+), 40 deletions(-) create mode 100644 internal/pkg/api/handleCheckin_test.go delete mode 100644 internal/pkg/api/handleChecking_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 49affcaac..1a0c0cdbc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -15,6 +15,7 @@ - LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912] - Use seperate rate limiters for internal and external API listeners. {issue}1859[1859] {pull}1904[1904] - Fix fleet.migration.total log key overlap {pull}1951[1951] +- Remove POLICY_CHANGE actions from list retrieved from actions index before sending actions to agent on Checkin. {issue}1773[1773] {pull}1963[1963] ==== New Features @@ -25,4 +26,4 @@ - Fleet Server now allows setting global labels on APM instrumentation. {pull}1649[1649] - Fleet Server now allows setting transaction sample rate on APM instrumentation {pull}1681[1681] - Log redacted config when config updates. {issue}1626[1626] {pull}1668[1668] -- Storing checkin message in last_checkin_message {pull}1932[1932] \ No newline at end of file +- Storing checkin message in last_checkin_message {pull}1932[1932] diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 62bd80fd9..6a8901b72 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -225,6 +225,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r if err != nil { return err } + pendingActions = filterActions(agent.Id, pendingActions) actions, ackToken = convertActions(agent.Id, pendingActions) if len(actions) == 0 { @@ -235,6 +236,7 @@ func (ct *CheckinT) processRequest(zlog zerolog.Logger, w http.ResponseWriter, r return ctx.Err() case acdocs := <-actCh: var acs []ActionResp + acdocs = filterActions(agent.Id, acdocs) acs, ackToken = convertActions(agent.Id, acdocs) actions = append(actions, acs...) break LOOP @@ -370,6 +372,22 @@ func (ct *CheckinT) fetchAgentPendingActions(ctx context.Context, seqno sqn.SeqN return actions, err } +// filterActions removes the POLICY_CHANGE action from the passed list. +// The source of this list are documents from the fleet actions index. +// The POLICY_CHANGE action that the agent receives are generated by the fleet-server when it detects a different policy in processRequest() +func filterActions(agentID string, actions []model.Action) []model.Action { + resp := make([]model.Action, 0, len(actions)) + for _, action := range actions { + if action.Type == TypePolicyChange { + log.Info().Str("agent_id", agentID).Str("action_id", action.ActionID).Msg("Removing POLICY_CHANGE action found in index from check in response") + continue + } + resp = append(resp, action) + } + return resp + +} + func convertActions(agentID string, actions []model.Action) ([]ActionResp, string) { var ackToken string sz := len(actions) diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go new file mode 100644 index 000000000..569963cbb --- /dev/null +++ b/internal/pkg/api/handleCheckin_test.go @@ -0,0 +1,103 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !integration +// +build !integration + +package api + +import ( + "encoding/json" + "testing" + + "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/stretchr/testify/assert" +) + +func TestConvertActions(t *testing.T) { + tests := []struct { + name string + actions []model.Action + resp []ActionResp + token string + }{{ + name: "empty actions", + actions: nil, + resp: []ActionResp{}, + token: "", + }, { + name: "single action", + actions: []model.Action{{ActionID: "1234"}}, + resp: []ActionResp{{ + AgentID: "agent-id", + ID: "1234", + Data: json.RawMessage(nil), + }}, + token: "", + }, { + name: "multiple actions", + actions: []model.Action{ + {ActionID: "1234"}, + {ActionID: "5678"}, + }, + resp: []ActionResp{{ + AgentID: "agent-id", + ID: "1234", + Data: json.RawMessage(nil), + }, { + AgentID: "agent-id", + ID: "5678", + Data: json.RawMessage(nil), + }}, + token: "", + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + resp, token := convertActions("agent-id", tc.actions) + assert.Equal(t, tc.resp, resp) + assert.Equal(t, tc.token, token) + }) + } +} + +func TestFilterActions(t *testing.T) { + tests := []struct { + name string + actions []model.Action + resp []model.Action + }{{ + name: "empty list", + actions: []model.Action{}, + resp: []model.Action{}, + }, { + name: "nothing filtered", + actions: []model.Action{{ + ActionID: "1234", + }, { + ActionID: "5678", + }}, + resp: []model.Action{{ + ActionID: "1234", + }, { + ActionID: "5678", + }}, + }, { + name: "filter POLICY_CHANGE action", + actions: []model.Action{{ + ActionID: "1234", + Type: TypePolicyChange, + }, { + ActionID: "5678", + }}, + resp: []model.Action{{ + ActionID: "5678", + }}, + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + resp := filterActions("agent-id", tc.actions) + assert.Equal(t, tc.resp, resp) + }) + } +} diff --git a/internal/pkg/api/handleChecking_test.go b/internal/pkg/api/handleChecking_test.go deleted file mode 100644 index 151f56055..000000000 --- a/internal/pkg/api/handleChecking_test.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !integration -// +build !integration - -package api - -import ( - "encoding/json" - "testing" - - "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/stretchr/testify/assert" -) - -func TestConvertActionsEmpty(t *testing.T) { - resp, token := convertActions("1234", nil) - assert.Equal(t, resp, []ActionResp{}) - assert.Equal(t, token, "") -} - -func TestConvertActions(t *testing.T) { - actions := []model.Action{ - { - ActionID: "1234", - }, - } - resp, token := convertActions("agent-id", actions) - assert.Equal(t, resp, []ActionResp{ - { - AgentID: "agent-id", - ID: "1234", - Data: json.RawMessage(nil), - }, - }) - assert.Equal(t, token, "") -}