Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and reintroduce "Allow multiple ES outputs as long as they are the same ES" #1879

Merged
merged 6 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,17 +821,21 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
if err != nil {
if len(remoteVersion) != 0 {
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err)
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
f.bi.Version, remoteVersion, err)
}
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
}

// Run migrations; current safe to do in background. That may change in the future.
g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
// Run migrations
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
return dl.Migrate(ctx, bulker)
}))
})
if err = loggedMigration(); err != nil {
return fmt.Errorf("failed to run subsystems: %w", err)
}

// Run schduler for periodic GC/cleanup
// Run scheduler for periodic GC/cleanup
gcCfg := cfg.Inputs[0].Server.GC
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
if err != nil {
Expand Down
49 changes: 23 additions & 26 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
Expand All @@ -24,7 +26,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/pkg/errors"

"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -337,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
Msg("ack policy revision")

if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
if ok && rev.PolicyID == agent.PolicyID &&
(rev.RevisionIdx > currRev ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
found = true
currRev = rev.RevisionIdx
currCoord = rev.CoordinatorIdx
Expand All @@ -349,17 +351,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

sz := len(agent.DefaultAPIKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
ids[i] = agent.DefaultAPIKeyHistory[i].ID
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
ack.invalidateAPIKeys(ctx, agent)

body := makeUpdatePolicyBody(
agent.PolicyID,
Expand All @@ -385,8 +377,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return errors.Wrap(err, "handlePolicyChange update")
}

func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) {
var ids []string
for _, out := range agent.Outputs {
for _, k := range out.ToRetireAPIKeyIds {
ids = append(ids, k.ID)
}
}

if len(ids) > 0 {
log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
apiKeys := _getAPIKeyIDs(agent)
apiKeys := agent.APIKeyIDs()
if len(apiKeys) > 0 {
zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger()

Expand Down Expand Up @@ -441,17 +449,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return nil
}

func _getAPIKeyIDs(agent *model.Agent) []string {
keys := make([]string, 0, 1)
if agent.AccessAPIKeyID != "" {
keys = append(keys, agent.AccessAPIKeyID)
}
if agent.DefaultAPIKeyID != "" {
keys = append(keys, agent.DefaultAPIKeyID)
}
return keys
}

// Generate an update script that validates that the policy_id
// has not changed underneath us by an upstream process (Kibana or otherwise).
// We have a race condition where a user could have assigned a new policy to
Expand Down
39 changes: 38 additions & 1 deletion internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"net/http"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
"github.com/google/go-cmp/cmp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -439,3 +440,39 @@ func TestHandleAckEvents(t *testing.T) {
})
}
}

func TestInvalidateAPIKeys(t *testing.T) {
toRetire1 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
}}
toRetire2 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire2_0",
}, {
ID: "toRetire2_1",
}}
var toRetire3 []model.ToRetireAPIKeyIdsItems

want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"}

agent := model.Agent{
Outputs: map[string]*model.PolicyOutput{
"1": {ToRetireAPIKeyIds: toRetire1},
"2": {ToRetireAPIKeyIds: toRetire2},
"3": {ToRetireAPIKeyIds: toRetire3},
},
}

bulker := ftesting.NewMockBulk()
bulker.On("APIKeyInvalidate",
context.Background(), mock.MatchedBy(func(ids []string) bool {
// if A contains B and B contains A => A = B
return assert.Subset(t, ids, want) &&
assert.Subset(t, want, ids)
})).
Return(nil)

ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), &agent)

bulker.AssertExpectations(t)
}
15 changes: 7 additions & 8 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"reflect"
Expand Down Expand Up @@ -60,7 +61,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
Logger()

err := rt.ct.handleCheckin(&zlog, w, r, id)

if err != nil {
cntCheckin.IncError(err)
resp := NewHTTPErrResp(err)
Expand Down Expand Up @@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin
//
func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) {
zlog = zlog.With().
Str("ctx", "processPolicy").
Int64("policyRevision", pp.Policy.RevisionIdx).
Int64("policyCoordinator", pp.Policy.CoordinatorIdx).
Str("fleet.ctx", "processPolicy").
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
Str(LogPolicyID, pp.Policy.PolicyID).
Logger()

// Repull and decode the agent object. Do not trust the cache.
// Repull and decode the agent object. Do not trust the cache.
agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID)
if err != nil {
zlog.Error().Err(err).Msg("fail find agent record")
Expand All @@ -446,7 +446,6 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Parse the outputs maps in order to prepare the outputs
const outputsProperty = "outputs"
outputs, err := smap.Parse(pp.Fields[outputsProperty])

if err != nil {
return nil, err
}
Expand All @@ -458,9 +457,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
// Iterate through the policy outputs and prepare them
for _, policyOutput := range pp.Outputs {
err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to prepare output %q: %w",
policyOutput.Name, err)
}
}

Expand Down
11 changes: 8 additions & 3 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type EnrollerT struct {
}

func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {

log.Info().
Interface("limits", cfg.Limits.EnrollLimit).
Msg("Setting config enroll_limit")
Expand Down Expand Up @@ -187,7 +186,13 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger,
return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver)
}

func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) {
func (et *EnrollerT) _enroll(
ctx context.Context,
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
ver string) (*EnrollResponse, error) {

if req.SharedID != "" {
// TODO: Support pre-existing install
Expand Down Expand Up @@ -427,7 +432,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) (
agentID,
"",
[]byte(kFleetAccessRolesJSON),
apikey.NewMetadata(agentID, apikey.TypeAccess),
apikey.NewMetadata(agentID, "", apikey.TypeAccess),
)
}

Expand Down
61 changes: 61 additions & 0 deletions internal/pkg/apikey/apikey.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
package apikey

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"unicode/utf8"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

const (
Expand All @@ -28,6 +34,61 @@ var (

var AuthKey = http.CanonicalHeaderKey("Authorization")

// APIKeyMetadata tracks Metadata associated with an APIKey.
type APIKeyMetadata struct {
ID string
Metadata Metadata
}

// Read gathers APIKeyMetadata from Elasticsearch using the given client.
func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) {
opts := []func(*esapi.SecurityGetAPIKeyRequest){
client.Security.GetAPIKey.WithContext(ctx),
client.Security.GetAPIKey.WithID(id),
}

res, err := client.Security.GetAPIKey(
opts...,
)
if err != nil {
return nil, fmt.Errorf("request to elasticsearch failed: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound)
}

type APIKeyResponse struct {
ID string `json:"id"`
Metadata Metadata `json:"metadata"`
}
type GetAPIKeyResponse struct {
APIKeys []APIKeyResponse `json:"api_keys"`
}

var buff bytes.Buffer
if _, err := buff.ReadFrom(res.Body); err != nil {
return nil, fmt.Errorf("could not read from response body: %w", err)
}

var resp GetAPIKeyResponse
if err = json.Unmarshal(buff.Bytes(), &resp); err != nil {
return nil, fmt.Errorf(
"could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err)
}

if len(resp.APIKeys) == 0 {
return nil, ErrAPIKeyNotFound
}

first := resp.APIKeys[0]
return &APIKeyMetadata{
ID: first.ID,
Metadata: first.Metadata,
}, nil
}

// APIKey is used to represent an Elasticsearch API Key.
type APIKey struct {
ID string
Expand Down
Loading