Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and reintroduce "Allow multiple ES outputs as long as they are the same ES" #1879

Merged
merged 6 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,17 +821,21 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
if err != nil {
if len(remoteVersion) != 0 {
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err)
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
f.bi.Version, remoteVersion, err)
}
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
}

// Run migrations; current safe to do in background. That may change in the future.
g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
// Run migrations
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
return dl.Migrate(ctx, bulker)
}))
})
if err = loggedMigration(); err != nil {
return fmt.Errorf("failed to run subsystems: %w", err)
}

// Run schduler for periodic GC/cleanup
// Run scheduler for periodic GC/cleanup
gcCfg := cfg.Inputs[0].Server.GC
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
if err != nil {
Expand Down
111 changes: 65 additions & 46 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"strings"
"time"

"github.com/julienschmidt/httprouter"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
Expand All @@ -25,11 +30,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/pkg/errors"

"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type HTTPError struct {
Expand Down Expand Up @@ -338,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
Expand All @@ -350,69 +351,81 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

if agent.DefaultAPIKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, agent.DefaultAPIKeyID, true)
for _, output := range agent.Outputs {
if output.Type != policy.OutputTypeElasticsearch {
continue
}

err := ack.updateAPIKey(ctx,
zlog,
agent.Id,
currRev, currCoord,
agent.PolicyID,
output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds)
if err != nil {
return err
}
}

return nil

}

func (ack *AckT) updateAPIKey(ctx context.Context,
zlog zerolog.Logger,
agentID string,
currRev, currCoord int64,
policyID, apiKeyID, permissionHash string,
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error {

if apiKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true)
if err != nil {
zlog.Error().
Err(err).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to read API Key roles")
} else {
clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors)
if err != nil {
zlog.Error().
Err(err).
RawJSON("roles", res.RoleDescriptors).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := ack.bulk.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, agent.PolicyOutputPermissionsHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, agent.DefaultAPIKeyID).Msg("Failed to update API Key")
if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", agent.PolicyOutputPermissionsHash).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Str("hash.sha256", permissionHash).
Str(LogAPIKeyID, apiKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Msg("Updating agent record to pick up reduced roles.")
}
}
}
}

sz := len(agent.DefaultAPIKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
if agent.DefaultAPIKeyHistory[i].ID == agent.DefaultAPIKeyID {
// already updated
continue
}
ids[i] = agent.DefaultAPIKeyHistory[i].ID
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Warn().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
ack.invalidateAPIKeys(ctx, toRetireAPIKeyIDs, apiKeyID)
}

body := makeUpdatePolicyBody(
agent.PolicyID,
policyID,
currRev,
currCoord,
)

err := ack.bulk.Update(
ctx,
dl.FleetAgents,
agent.Id,
agentID,
body,
bulk.WithRefresh(),
bulk.WithRetryOnConflict(3),
)

zlog.Err(err).
Str(LogPolicyID, agent.PolicyID).
Str(LogPolicyID, policyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Msg("ack policy")
Expand Down Expand Up @@ -445,8 +458,25 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {
return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition")
}

func (ack *AckT) invalidateAPIKeys(ctx context.Context, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) {
ids := make([]string, 0, len(toRetireAPIKeyIDs))
for _, k := range toRetireAPIKeyIDs {
if k.ID == skip || k.ID == "" {
continue
}
ids = append(ids, k.ID)
}

if len(ids) > 0 {
log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
apiKeys := _getAPIKeyIDs(agent)
apiKeys := agent.APIKeyIDs()
if len(apiKeys) > 0 {
zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger()

Expand Down Expand Up @@ -501,17 +531,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return nil
}

func _getAPIKeyIDs(agent *model.Agent) []string {
keys := make([]string, 0, 1)
if agent.AccessAPIKeyID != "" {
keys = append(keys, agent.AccessAPIKeyID)
}
if agent.DefaultAPIKeyID != "" {
keys = append(keys, agent.DefaultAPIKeyID)
}
return keys
}

// Generate an update script that validates that the policy_id
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
Expand Down
55 changes: 54 additions & 1 deletion internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"net/http"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
"github.com/google/go-cmp/cmp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -439,3 +440,55 @@ func TestHandleAckEvents(t *testing.T) {
})
}
}

func TestInvalidateAPIKeys(t *testing.T) {
toRetire1 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
}}
toRetire2 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire2_0",
}, {
ID: "toRetire2_1",
}}
var toRetire3 []model.ToRetireAPIKeyIdsItems

skips := map[string]string{
"1": "toRetire1",
"2": "toRetire2_0",
"3": "",
}
wants := map[string][]string{
"1": {},
"2": {"toRetire2_1"},
"3": {},
}

agent := model.Agent{
Outputs: map[string]*model.PolicyOutput{
"1": {ToRetireAPIKeyIds: toRetire1},
"2": {ToRetireAPIKeyIds: toRetire2},
"3": {ToRetireAPIKeyIds: toRetire3},
},
}

for i, out := range agent.Outputs {
skip := skips[i]
want := wants[i]

bulker := ftesting.NewMockBulk()
if len(want) > 0 {
bulker.On("APIKeyInvalidate",
context.Background(), mock.MatchedBy(func(ids []string) bool {
// if A contains B and B contains A => A = B
return assert.Subset(t, ids, want) &&
assert.Subset(t, want, ids)
})).
Return(nil)
}

ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), out.ToRetireAPIKeyIds, skip)

bulker.AssertExpectations(t)
}
}
15 changes: 7 additions & 8 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"reflect"
Expand Down Expand Up @@ -60,7 +61,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
Logger()

err := rt.ct.handleCheckin(&zlog, w, r, id)

if err != nil {
cntCheckin.IncError(err)
resp := NewHTTPErrResp(err)
Expand Down Expand Up @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin
//
func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) {
zlog = zlog.With().
Str("ctx", "processPolicy").
Int64("policyRevision", pp.Policy.RevisionIdx).
Int64("policyCoordinator", pp.Policy.CoordinatorIdx).
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

// Repull and decode the agent object. Do not trust the cache.
// Repull and decode the agent object. Do not trust the cache.
agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID)
if err != nil {
zlog.Error().Err(err).Msg("fail find agent record")
Expand All @@ -446,7 +446,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Parse the outputs maps in order to prepare the outputs
const outputsProperty = "outputs"
outputs, err := smap.Parse(pp.Fields[outputsProperty])

if err != nil {
return nil, err
}
Expand All @@ -458,9 +457,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Iterate through the policy outputs and prepare them
for _, policyOutput := range pp.Outputs {
err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to prepare output %q: %w",
policyOutput.Name, err)
}
}

Expand Down
11 changes: 8 additions & 3 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type EnrollerT struct {
}

func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {

log.Info().
Interface("limits", cfg.Limits.EnrollLimit).
Msg("Setting config enroll_limit")
Expand Down Expand Up @@ -187,7 +186,13 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger,
return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver)
}

func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) {
func (et *EnrollerT) _enroll(
ctx context.Context,
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
ver string) (*EnrollResponse, error) {

if req.SharedID != "" {
// TODO: Support pre-existing install
Expand Down Expand Up @@ -427,7 +432,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) (
agentID,
"",
[]byte(kFleetAccessRolesJSON),
apikey.NewMetadata(agentID, apikey.TypeAccess),
apikey.NewMetadata(agentID, "", apikey.TypeAccess),
)
}

Expand Down
Loading