From c14692388ae35500489237d936a5244d446b9dba Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 20 Sep 2022 12:29:10 +0200 Subject: [PATCH] Suggested/conflicts 2 (#2) * merge conflicts * conflict * conflicts * lint * skip empty keys * wee changes and fix Co-authored-by: Michal Pristas --- dev-tools/integration/.env | 2 +- internal/pkg/api/handleAck.go | 103 ++++++++- internal/pkg/api/handleAck_test.go | 40 ++-- internal/pkg/api/handleEnroll.go | 2 +- internal/pkg/apikey/apikey.go | 36 ++-- .../pkg/apikey/apikey_integration_test.go | 61 +++++- internal/pkg/bulk/block.go | 2 + internal/pkg/bulk/engine.go | 8 +- internal/pkg/bulk/opApiKey.go | 200 +++++++++++++++++- internal/pkg/bulk/opt.go | 13 ++ internal/pkg/bulk/queue.go | 3 + internal/pkg/config/config_test.go | 13 +- internal/pkg/config/output.go | 28 +-- internal/pkg/es/bulk_update_api_key.go | 109 ++++++++++ internal/pkg/policy/policy_output.go | 170 ++++++++++++++- internal/pkg/policy/policy_output_test.go | 12 +- internal/pkg/testing/bulk.go | 7 +- 17 files changed, 733 insertions(+), 76 deletions(-) create mode 100644 internal/pkg/es/bulk_update_api_key.go diff --git a/dev-tools/integration/.env b/dev-tools/integration/.env index 8d460e721..b4bb4fe9f 100644 --- a/dev-tools/integration/.env +++ b/dev-tools/integration/.env @@ -1,4 +1,4 @@ -ELASTICSEARCH_VERSION=8.5.0-c7913db3-SNAPSHOT +ELASTICSEARCH_VERSION=8.5.0-56d2c52d-SNAPSHOT ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=changeme TEST_ELASTICSEARCH_HOSTS=localhost:9200 \ No newline at end of file diff --git a/internal/pkg/api/handleAck.go b/internal/pkg/api/handleAck.go index 1261d8f6c..2a502f70c 100644 --- a/internal/pkg/api/handleAck.go +++ b/internal/pkg/api/handleAck.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/elastic/fleet-server/v7/internal/pkg/smap" "github.com/julienschmidt/httprouter" "github.com/rs/zerolog" @@ -351,10 +352,66 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return nil } - ack.invalidateAPIKeys(ctx, agent) + for _, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + + err := ack.updateAPIKey(ctx, + zlog, + agent.Id, + currRev, currCoord, + agent.PolicyID, + output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds) + if err != nil { + return err + } + } + + return nil + +} + +func (ack *AckT) updateAPIKey(ctx context.Context, + zlog zerolog.Logger, + agentID string, + currRev, currCoord int64, + policyID, apiKeyID, permissionHash string, + toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error { + + if apiKeyID != "" { + res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true) + if err != nil { + zlog.Error(). + Err(err). + Str(LogAPIKeyID, apiKeyID). + Msg("Failed to read API Key roles") + } else { + clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors) + if err != nil { + zlog.Error(). + Err(err). + RawJSON("roles", res.RoleDescriptors). + Str(LogAPIKeyID, apiKeyID). + Msg("Failed to cleanup roles") + } else if removedRolesCount > 0 { + if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil { + zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key") + } else { + zlog.Debug(). + Str("hash.sha256", permissionHash). + Str(LogAPIKeyID, apiKeyID). + RawJSON("roles", clean). + Int("removedRoles", removedRolesCount). + Msg("Updating agent record to pick up reduced roles.") + } + } + } + ack.invalidateAPIKeys(ctx, toRetireAPIKeyIDs, apiKeyID) + } body := makeUpdatePolicyBody( - agent.PolicyID, + policyID, currRev, currCoord, ) @@ -362,14 +419,14 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag err := ack.bulk.Update( ctx, dl.FleetAgents, - agent.Id, + agentID, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3), ) - zlog.Info().Err(err). - Str(LogPolicyID, agent.PolicyID). + zlog.Err(err). + Str(LogPolicyID, policyID). Int64("policyRevision", currRev). Int64("policyCoordinator", currCoord). Msg("ack policy") @@ -377,12 +434,38 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag return errors.Wrap(err, "handlePolicyChange update") } -func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) { - var ids []string - for _, out := range agent.Outputs { - for _, k := range out.ToRetireAPIKeyIds { - ids = append(ids, k.ID) +func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) { + rr := smap.Map{} + if err := json.Unmarshal(roles, &rr); err != nil { + return nil, 0, errors.Wrap(err, "failed to unmarshal provided roles") + } + + keys := make([]string, 0, len(rr)) + for k := range rr { + if strings.HasSuffix(k, "-rdstale") { + keys = append(keys, k) + } + } + + if len(keys) == 0 { + return roles, 0, nil + } + + for _, k := range keys { + delete(rr, k) + } + + r, err := json.Marshal(rr) + return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition") +} + +func (ack *AckT) invalidateAPIKeys(ctx context.Context, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) { + ids := make([]string, 0, len(toRetireAPIKeyIDs)) + for _, k := range toRetireAPIKeyIDs { + if k.ID == skip || k.ID == "" { + continue } + ids = append(ids, k.ID) } if len(ids) > 0 { diff --git a/internal/pkg/api/handleAck_test.go b/internal/pkg/api/handleAck_test.go index 60a265bd4..29c678c24 100644 --- a/internal/pkg/api/handleAck_test.go +++ b/internal/pkg/api/handleAck_test.go @@ -452,7 +452,16 @@ func TestInvalidateAPIKeys(t *testing.T) { }} var toRetire3 []model.ToRetireAPIKeyIdsItems - want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"} + skips := map[string]string{ + "1": "toRetire1", + "2": "toRetire2_0", + "3": "", + } + wants := map[string][]string{ + "1": {}, + "2": {"toRetire2_1"}, + "3": {}, + } agent := model.Agent{ Outputs: map[string]*model.PolicyOutput{ @@ -462,17 +471,24 @@ func TestInvalidateAPIKeys(t *testing.T) { }, } - bulker := ftesting.NewMockBulk() - bulker.On("APIKeyInvalidate", - context.Background(), mock.MatchedBy(func(ids []string) bool { - // if A contains B and B contains A => A = B - return assert.Subset(t, ids, want) && - assert.Subset(t, want, ids) - })). - Return(nil) + for i, out := range agent.Outputs { + skip := skips[i] + want := wants[i] + + bulker := ftesting.NewMockBulk() + if len(want) > 0 { + bulker.On("APIKeyInvalidate", + context.Background(), mock.MatchedBy(func(ids []string) bool { + // if A contains B and B contains A => A = B + return assert.Subset(t, ids, want) && + assert.Subset(t, want, ids) + })). + Return(nil) + } - ack := &AckT{bulk: bulker} - ack.invalidateAPIKeys(context.Background(), &agent) + ack := &AckT{bulk: bulker} + ack.invalidateAPIKeys(context.Background(), out.ToRetireAPIKeyIds, skip) - bulker.AssertExpectations(t) + bulker.AssertExpectations(t) + } } diff --git a/internal/pkg/api/handleEnroll.go b/internal/pkg/api/handleEnroll.go index 7c5b1dd5a..9123723d6 100644 --- a/internal/pkg/api/handleEnroll.go +++ b/internal/pkg/api/handleEnroll.go @@ -298,7 +298,7 @@ func invalidateAPIKey(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk LOOP: for { - _, err := bulker.APIKeyRead(ctx, apikeyID) + _, err := bulker.APIKeyRead(ctx, apikeyID, false) switch { case err == nil: diff --git a/internal/pkg/apikey/apikey.go b/internal/pkg/apikey/apikey.go index 4134f2b0d..05551f272 100644 --- a/internal/pkg/apikey/apikey.go +++ b/internal/pkg/apikey/apikey.go @@ -6,11 +6,9 @@ package apikey import ( - "bytes" "context" "encoding/base64" "encoding/json" - "errors" "fmt" "net/http" "strings" @@ -18,6 +16,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/pkg/errors" ) const ( @@ -36,20 +35,26 @@ var AuthKey = http.CanonicalHeaderKey("Authorization") // APIKeyMetadata tracks Metadata associated with an APIKey. type APIKeyMetadata struct { - ID string - Metadata Metadata + ID string + Metadata Metadata + RoleDescriptors json.RawMessage } // Read gathers APIKeyMetadata from Elasticsearch using the given client. -func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { +func Read(ctx context.Context, client *elasticsearch.Client, id string, withOwner bool) (*APIKeyMetadata, error) { + opts := []func(*esapi.SecurityGetAPIKeyRequest){ client.Security.GetAPIKey.WithContext(ctx), client.Security.GetAPIKey.WithID(id), } + if withOwner { + opts = append(opts, client.Security.GetAPIKey.WithOwner(true)) + } res, err := client.Security.GetAPIKey( opts..., ) + if err != nil { return nil, fmt.Errorf("request to elasticsearch failed: %w", err) } @@ -60,22 +65,19 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey } type APIKeyResponse struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata"` + ID string `json:"id"` + Metadata Metadata `json:"metadata"` + RoleDescriptors json.RawMessage `json:"role_descriptors"` } type GetAPIKeyResponse struct { APIKeys []APIKeyResponse `json:"api_keys"` } - var buff bytes.Buffer - if _, err := buff.ReadFrom(res.Body); err != nil { - return nil, fmt.Errorf("could not read from response body: %w", err) - } - var resp GetAPIKeyResponse - if err = json.Unmarshal(buff.Bytes(), &resp); err != nil { + d := json.NewDecoder(res.Body) + if err = d.Decode(&resp); err != nil { return nil, fmt.Errorf( - "could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err) + "could not decode elasticsearch GetAPIKeyResponse: %w", err) } if len(resp.APIKeys) == 0 { @@ -83,9 +85,11 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey } first := resp.APIKeys[0] + return &APIKeyMetadata{ - ID: first.ID, - Metadata: first.Metadata, + ID: first.ID, + Metadata: first.Metadata, + RoleDescriptors: first.RoleDescriptors, }, nil } diff --git a/internal/pkg/apikey/apikey_integration_test.go b/internal/pkg/apikey/apikey_integration_test.go index 72f410d99..ce4529254 100644 --- a/internal/pkg/apikey/apikey_integration_test.go +++ b/internal/pkg/apikey/apikey_integration_test.go @@ -30,7 +30,58 @@ const testFleetRoles = ` } ` -func TestRead(t *testing.T) { +func TestRead_existingKey(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + cfg := elasticsearch.Config{ + Username: "elastic", + Password: "changeme", + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + t.Fatal(err) + } + + // Create the key + agentID := uuid.Must(uuid.NewV4()).String() + name := uuid.Must(uuid.NewV4()).String() + akey, err := Create(ctx, es, name, "", "true", []byte(testFleetRoles), + NewMetadata(agentID, "", TypeAccess)) + if err != nil { + t.Fatal(err) + } + + // Get the key and verify that metadata was saved correctly + aKeyMeta, err := Read(ctx, es, akey.ID, false) + if err != nil { + t.Fatal(err) + } + + diff := cmp.Diff(ManagedByFleetServer, aKeyMeta.Metadata.ManagedBy) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(true, aKeyMeta.Metadata.Managed) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(agentID, aKeyMeta.Metadata.AgentID) + if diff != "" { + t.Error(diff) + } + + diff = cmp.Diff(TypeAccess.String(), aKeyMeta.Metadata.Type) + if diff != "" { + t.Error(diff) + } + +} + +func TestRead_noKey(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -45,12 +96,12 @@ func TestRead(t *testing.T) { } // Try to get the key that doesn't exist, expect ErrApiKeyNotFound - _, err = Read(ctx, es, "0000000000000") + _, err = Read(ctx, es, "0000000000000", false) if !errors.Is(err, ErrAPIKeyNotFound) { - t.Errorf("Unexpected error type: %v", err) + t.Errorf("Unexpected error: %v", err) } - } + func TestCreateAPIKeyWithMetadata(t *testing.T) { tts := []struct { name string @@ -92,7 +143,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) { } // Get the API key and verify that the metadata was saved correctly - aKeyMeta, err := Read(ctx, es, apiKey.ID) + aKeyMeta, err := Read(ctx, es, apiKey.ID, false) if err != nil { t.Fatal(err) } diff --git a/internal/pkg/bulk/block.go b/internal/pkg/bulk/block.go index 28c80927e..c2535172e 100644 --- a/internal/pkg/bulk/block.go +++ b/internal/pkg/bulk/block.go @@ -43,6 +43,7 @@ const ( ActionDelete ActionIndex ActionUpdate + ActionUpdateAPIKey ActionRead ActionSearch ActionFleetSearch @@ -53,6 +54,7 @@ var actionStrings = []string{ "delete", "index", "update", + "update_api_key", "read", "search", "fleet_search", diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 93420c780..68840729c 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -55,9 +55,10 @@ type Bulk interface { // APIKey operations APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error) - APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error) + APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) APIKeyInvalidate(ctx context.Context, ids ...string) error + APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error // Accessor used to talk to elastic search direcly bypassing bulk engine Client() *elasticsearch.Client @@ -81,6 +82,7 @@ const ( defaultMaxPending = 32 defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast defaultAPIKeyMaxParallel = 32 + defaultApikeyMaxReqSize = 100 * 1024 * 1024 ) func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker { @@ -136,6 +138,8 @@ func blkToQueueType(blk *bulkT) queueType { } else { queueIdx = kQueueRead } + case ActionUpdateAPIKey: + queueIdx = kQueueAPIKeyUpdate default: if forceRefresh { queueIdx = kQueueRefreshBulk @@ -288,6 +292,8 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu err = b.flushRead(ctx, queue) case kQueueSearch, kQueueFleetSearch: err = b.flushSearch(ctx, queue) + case kQueueAPIKeyUpdate: + err = b.flushUpdateAPIKey(ctx, queue) default: err = b.flushBulk(ctx, queue) } diff --git a/internal/pkg/bulk/opApiKey.go b/internal/pkg/bulk/opApiKey.go index 049c0ce17..099a7d291 100644 --- a/internal/pkg/bulk/opApiKey.go +++ b/internal/pkg/bulk/opApiKey.go @@ -5,15 +5,36 @@ package bulk import ( + "bytes" "context" + "encoding/json" + "math" "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/rs/zerolog/log" +) + +const ( + envelopeSize = 64 // 64B + safeBuffer = 0.9 ) // The ApiKey API's are not yet bulk enabled. Stub the calls in the bulker // and limit parallel access to prevent many requests from overloading // the connection pool in the elastic search client. +type apiKeyUpdateRequest struct { + ID string `json:"id,omitempty"` + Roles json.RawMessage `json:"role_descriptors,omitempty"` + RolesHash string `json:"role_hash,omitempty"` +} + +type esAPIKeyBulkUpdateRequest struct { + IDs []string `json:"ids,omitempty"` + Roles json.RawMessage `json:"role_descriptors,omitempty"` +} + func (b *Bulker) APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error) { if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err @@ -32,13 +53,13 @@ func (b *Bulker) APIKeyCreate(ctx context.Context, name, ttl string, roles []byt return apikey.Create(ctx, b.Client(), name, ttl, "false", roles, meta) } -func (b *Bulker) APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error) { +func (b *Bulker) APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error) { if err := b.apikeyLimit.Acquire(ctx, 1); err != nil { return nil, err } defer b.apikeyLimit.Release(1) - return apikey.Read(ctx, b.Client(), id) + return apikey.Read(ctx, b.Client(), id, withOwner) } func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { @@ -49,3 +70,178 @@ func (b *Bulker) APIKeyInvalidate(ctx context.Context, ids ...string) error { return apikey.Invalidate(ctx, b.Client(), ids...) } + +func (b *Bulker) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { + req := &apiKeyUpdateRequest{ + ID: id, + Roles: roles, + RolesHash: outputPolicyHash, + } + + body, err := json.Marshal(req) + if err != nil { + return err + } + + _, err = b.waitBulkAction(ctx, ActionUpdateAPIKey, "", id, body) + return err +} + +// flushUpdateAPIKey takes an update API Key queue and groups request based on roles applied +// It needs to group agent IDs per Role Hash in order to produce more efficient request containing a list of IDs for a change(update) +// One thing to have in mind is that in a single queue there may be change and ack request with roles. in this case +// Later occurrence wins overwriting policy change to reduced set of permissions. +// Even if the order was incorrect we end up with just a bit broader permission set, never too strict, so agent does not +// end up with fewer permissions than it needs +func (b *Bulker) flushUpdateAPIKey(ctx context.Context, queue queueT) error { + idsPerRole := make(map[string][]string) + roles := make(map[string]json.RawMessage) + rolePerID := make(map[string]string) + responses := make(map[int]int) + idxToID := make(map[int32]string) + IDToResponse := make(map[string]int) + maxKeySize := 0 + + // merge ids + for n := queue.head; n != nil; n = n.next { + content := n.buf.Bytes() + metaMap := make(map[string]interface{}) + dec := json.NewDecoder(bytes.NewReader(content)) + if err := dec.Decode(&metaMap); err != nil { + log.Error(). + Err(err). + Str("mod", kModBulk). + Msg("Failed to unmarshal api key update meta map") + return err + } + + var req *apiKeyUpdateRequest + if err := dec.Decode(&req); err != nil { + log.Error(). + Err(err). + Str("mod", kModBulk). + Str("request", string(content)). + Msg("Failed to unmarshal api key update request") + return err + } + + if _, tracked := roles[req.RolesHash]; !tracked { + roles[req.RolesHash] = req.Roles + } + + // last one wins, it may be policy change and ack are in the same queue + rolePerID[req.ID] = req.RolesHash + idxToID[n.idx] = req.ID + if maxKeySize < len(req.ID) { + maxKeySize = len(req.ID) + } + } + + for id, roleHash := range rolePerID { + delete(rolePerID, id) + idsPerRole[roleHash] = append(idsPerRole[roleHash], id) + + } + + responseIdx := 0 + for hash, role := range roles { + idsPerBatch := b.getIDsCountPerBatch(len(role), maxKeySize) + ids := idsPerRole[hash] + if idsPerBatch <= 0 { + log.Error().Str("err", "request too large").Msg("No API Key ID could fit request size for bulk update") + log.Debug(). + RawJSON("role", role). + Strs("ids", ids). + Msg("IDs could not fit into a message") + + // idsPerRole for specific role no longer needed + delete(idsPerRole, hash) + continue + } + + batches := int(math.Ceil(float64(len(ids)) / float64(idsPerBatch))) + + // batch ids into batches of meaningful size + for batch := 0; batch < batches; batch++ { + // guard against indexing out of range + to := (batch + 1) * idsPerBatch + if to > len(ids) { + to = len(ids) + } + + // handle ids in batch, we put them into single request + // and assign response index to the id so we can notify caller + idsInBatch := ids[batch*idsPerBatch : to] + bulkReq := &esAPIKeyBulkUpdateRequest{ + IDs: idsInBatch, + Roles: role, + } + delete(roles, hash) + + payload, err := json.Marshal(bulkReq) + if err != nil { + return err + } + + req := &es.UpdateApiKeyBulkRequest{ + Body: bytes.NewReader(payload), + } + + res, err := req.Do(ctx, b.es) + if err != nil { + log.Error().Err(err).Msg("Error sending bulk API Key update request to Elasticsearch") + return err + } + if res.Body != nil { + defer res.Body.Close() + } + if res.IsError() { + log.Error().Str("err", res.String()).Msg("Error in bulk API Key update result to Elasticsearch") + return parseError(res) + } + + log.Debug().Strs("IDs", bulkReq.IDs).RawJSON("role", role).Msg("API Keys updated.") + + responses[responseIdx] = res.StatusCode + for _, id := range idsInBatch { + IDToResponse[id] = responseIdx + } + responseIdx++ + } + + // idsPerRole for specific role no longer needed + delete(idsPerRole, hash) + } + + // WARNING: Once we start pushing items to + // the queue, the node pointers are invalid. + // Do NOT return a non-nil value or failQueue + // up the stack will fail. + + for n := queue.head; n != nil; n = n.next { + // 'n' is invalid immediately on channel send + responseIdx := IDToResponse[idxToID[n.idx]] + res := responses[responseIdx] + select { + case n.ch <- respT{ + err: nil, + idx: n.idx, + data: &BulkIndexerResponseItem{ + DocumentID: "", + Status: res, + }, + }: + default: + panic("Unexpected blocked response channel on flushRead") + } + } + return nil +} + +func (b *Bulker) getIDsCountPerBatch(roleSize, maxKeySize int) int { + spareSpace := b.opts.apikeyMaxReqSize - roleSize - envelopeSize + if spareSpace > maxKeySize { + return int(float64(spareSpace) * safeBuffer / float64(maxKeySize)) + } + return 0 +} diff --git a/internal/pkg/bulk/opt.go b/internal/pkg/bulk/opt.go index e0701823e..6eeb2fe21 100644 --- a/internal/pkg/bulk/opt.go +++ b/internal/pkg/bulk/opt.go @@ -62,6 +62,7 @@ type bulkOptT struct { maxPending int blockQueueSz int apikeyMaxParallel int + apikeyMaxReqSize int } type BulkOpt func(*bulkOptT) @@ -108,6 +109,15 @@ func WithAPIKeyMaxParallel(max int) BulkOpt { } } +// WithAPIKeyMaxRequestSize sets the maximum size of the request body. Default 100MB +func WithAPIKeyMaxRequestSize(maxBytes int) BulkOpt { + return func(opt *bulkOptT) { + if opt.apikeyMaxReqSize > 0 { + opt.apikeyMaxReqSize = maxBytes + } + } +} + func parseBulkOpts(opts ...BulkOpt) bulkOptT { bopt := bulkOptT{ flushInterval: defaultFlushInterval, @@ -116,6 +126,7 @@ func parseBulkOpts(opts ...BulkOpt) bulkOptT { maxPending: defaultMaxPending, apikeyMaxParallel: defaultAPIKeyMaxParallel, blockQueueSz: defaultBlockQueueSz, + apikeyMaxReqSize: defaultApikeyMaxReqSize, } for _, f := range opts { @@ -132,6 +143,7 @@ func (o *bulkOptT) MarshalZerologObject(e *zerolog.Event) { e.Int("maxPending", o.maxPending) e.Int("blockQueueSz", o.blockQueueSz) e.Int("apikeyMaxParallel", o.apikeyMaxParallel) + e.Int("apikeyMaxReqSize", o.apikeyMaxReqSize) } // BulkOptsFromCfg transforms config to a slize of BulkOpt @@ -152,5 +164,6 @@ func BulkOptsFromCfg(cfg *config.Config) []BulkOpt { WithFlushThresholdSize(bulkCfg.FlushThresholdSize), WithMaxPending(bulkCfg.FlushMaxPending), WithAPIKeyMaxParallel(maxKeyParallel), + WithAPIKeyMaxRequestSize(cfg.Output.Elasticsearch.MaxContentLength), } } diff --git a/internal/pkg/bulk/queue.go b/internal/pkg/bulk/queue.go index dc7bde5d1..f2060212a 100644 --- a/internal/pkg/bulk/queue.go +++ b/internal/pkg/bulk/queue.go @@ -20,6 +20,7 @@ const ( kQueueFleetSearch kQueueRefreshBulk kQueueRefreshRead + kQueueAPIKeyUpdate kNumQueues ) @@ -37,6 +38,8 @@ func (q queueT) Type() string { return "refreshBulk" case kQueueRefreshRead: return "refreshRead" + case kQueueAPIKeyUpdate: + return "apiKeyUpdate" } panic("unknown") } diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index 4e3d7ea61..110f93783 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -237,12 +237,13 @@ func defaultFleet() Fleet { func defaultElastic() Elasticsearch { return Elasticsearch{ - Protocol: "http", - ServiceToken: "test-token", - Hosts: []string{"localhost:9200"}, - MaxRetries: 3, - MaxConnPerHost: 128, - Timeout: 90 * time.Second, + Protocol: "http", + ServiceToken: "test-token", + Hosts: []string{"localhost:9200"}, + MaxRetries: 3, + MaxConnPerHost: 128, + MaxContentLength: 104857600, + Timeout: 90 * time.Second, } } diff --git a/internal/pkg/config/output.go b/internal/pkg/config/output.go index 5804d5858..8e8751d45 100644 --- a/internal/pkg/config/output.go +++ b/internal/pkg/config/output.go @@ -28,19 +28,20 @@ var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`) // Elasticsearch is the configuration for elasticsearch. type Elasticsearch struct { - Protocol string `config:"protocol"` - Hosts []string `config:"hosts"` - Path string `config:"path"` - Headers map[string]string `config:"headers"` - APIKey string `config:"api_key"` - ServiceToken string `config:"service_token"` - ProxyURL string `config:"proxy_url"` - ProxyDisable bool `config:"proxy_disable"` - ProxyHeaders map[string]string `config:"proxy_headers"` - TLS *tlscommon.Config `config:"ssl"` - MaxRetries int `config:"max_retries"` - MaxConnPerHost int `config:"max_conn_per_host"` - Timeout time.Duration `config:"timeout"` + Protocol string `config:"protocol"` + Hosts []string `config:"hosts"` + Path string `config:"path"` + Headers map[string]string `config:"headers"` + APIKey string `config:"api_key"` + ServiceToken string `config:"service_token"` + ProxyURL string `config:"proxy_url"` + ProxyDisable bool `config:"proxy_disable"` + ProxyHeaders map[string]string `config:"proxy_headers"` + TLS *tlscommon.Config `config:"ssl"` + MaxRetries int `config:"max_retries"` + MaxConnPerHost int `config:"max_conn_per_host"` + Timeout time.Duration `config:"timeout"` + MaxContentLength int `config:"max_content_length"` } // InitDefaults initializes the defaults for the configuration. @@ -50,6 +51,7 @@ func (c *Elasticsearch) InitDefaults() { c.Timeout = 90 * time.Second c.MaxRetries = 3 c.MaxConnPerHost = 128 + c.MaxContentLength = 100 * 1024 * 1024 } // Validate ensures that the configuration is valid. diff --git a/internal/pkg/es/bulk_update_api_key.go b/internal/pkg/es/bulk_update_api_key.go new file mode 100644 index 000000000..e75df2996 --- /dev/null +++ b/internal/pkg/es/bulk_update_api_key.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated from specification version 7.x: DO NOT EDIT + +// This is a copy of api.search.go file from go-elasticsearch library +// It was modified for /_fleet/_fleet_search experimental API, +// implemented by the custom fleet plugin https://github.com/elastic/elasticsearch/pull/73134 +// This file can be removed and replaced with the official client library wrapper once it is available + +package es + +import ( + "context" + "io" + "net/http" + "strings" + + "github.com/elastic/go-elasticsearch/v7/esapi" +) + +const updateAPIKeyPath = "/_security/api_key/_bulk_update" + +type UpdateApiKeyBulk func(o ...func(*UpdateApiKeyBulkRequest)) (*Response, error) + +type UpdateApiKeyBulkRequest struct { + Body io.Reader + + Header http.Header + + ctx context.Context +} + +// Do executes the request and returns response or error. +// +func (r UpdateApiKeyBulkRequest) Do(ctx context.Context, transport esapi.Transport) (*esapi.Response, error) { + var path strings.Builder + + path.Grow(len(updateAPIKeyPath)) + path.WriteString(updateAPIKeyPath) + + req, err := newRequest(http.MethodPost, path.String(), r.Body) + if err != nil { + return nil, err + } + + if r.Body != nil { + req.Header[headerContentType] = headerContentTypeJSON + } + + if len(r.Header) > 0 { + if len(req.Header) == 0 { + req.Header = r.Header + } else { + for k, vv := range r.Header { + for _, v := range vv { + req.Header.Add(k, v) + } + } + } + } + + if ctx != nil { + req = req.WithContext(ctx) + } + + res, err := transport.Perform(req) + if err != nil { + return nil, err + } + + response := esapi.Response{ + StatusCode: res.StatusCode, + Body: res.Body, + Header: res.Header, + } + + return &response, nil +} + +// WithContext sets the request context. +// +func (f UpdateApiKeyBulkRequest) WithContext(v context.Context) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + r.ctx = v + } +} + +// WithBody - The search definition using the Query DSL. +// +func (f UpdateApiKeyBulkRequest) WithBody(v io.Reader) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + r.Body = v + } +} + +// WithHeader adds the headers to the HTTP request. +// +func (f UpdateApiKeyBulkRequest) WithHeader(h map[string]string) func(*UpdateApiKeyBulkRequest) { + return func(r *UpdateApiKeyBulkRequest) { + if r.Header == nil { + r.Header = make(http.Header) + } + for k, v := range h { + r.Header.Add(k, v) + } + } +} diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index fefd192d3..55683cf51 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -91,22 +91,78 @@ func (p *Output) prepareElasticsearch( // Note: This will need to be updated when doing multi-cluster elasticsearch support // Currently, we assume all ES outputs are the same ES fleet-server is connected to. - needNewKey := true + needNewKey := false + needUpdateKey := false switch { case output.APIKey == "": zlog.Debug().Msg("must generate api key as default API key is not present") + needNewKey = true case p.Role.Sha2 != output.PermissionsHash: // the is actually the OutputPermissionsHash for the default hash. The Agent // document on ES does not have OutputPermissionsHash for any other output // besides the default one. It seems to me error-prone to rely on the default // output permissions hash to generate new API keys for other outputs. zlog.Debug().Msg("must generate api key as policy output permissions changed") + needUpdateKey = true default: - needNewKey = false zlog.Debug().Msg("policy output permissions are the same") } - if needNewKey { + if needUpdateKey { + zlog.Debug(). + RawJSON("roles", p.Role.Raw). + Str("oldHash", output.PermissionsHash). + Str("newHash", p.Role.Sha2). + Msg("Generating a new API key") + + // query current api key for roles so we don't lose permissions in the meantime + currentRoles, err := fetchAPIKeyRoles(ctx, bulker, output.APIKeyID) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail fetching roles for key") + return err + } + + // merge roles with p.Role + newRoles, err := mergeRoles(zlog, currentRoles, p.Role) + if err != nil { + zlog.Error(). + Str("apiKeyID", output.APIKeyID). + Err(err).Msg("fail merging roles for key") + return err + } + + // hash provided is only for merging request together and not persisted + err = bulker.APIKeyUpdate(ctx, output.APIKeyID, newRoles.Sha2, newRoles.Raw) + if err != nil { + zlog.Error().Err(err).Msg("fail generate output key") + zlog.Debug().RawJSON("roles", newRoles.Raw).Str("sha", newRoles.Sha2).Err(err).Msg("roles not updated") + return err + } + + output.PermissionsHash = p.Role.Sha2 // for the sake of consistency + zlog.Debug(). + Str("hash.sha256", p.Role.Sha2). + Str("roles", string(p.Role.Raw)). + Msg("Updating agent record to pick up most recent roles.") + + fields := map[string]interface{}{ + dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2, + } + + // Using painless script to update permission hash for updated key + body, err := renderUpdatePainlessScript(p.Name, fields) + if err != nil { + return err + } + + if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body); err != nil { + zlog.Error().Err(err).Msg("fail update agent record") + return err + } + + } else if needNewKey { zlog.Debug(). RawJSON("fleet.policy.roles", p.Role.Raw). Str("fleet.policy.default.oldHash", output.PermissionsHash). @@ -181,6 +237,114 @@ func (p *Output) prepareElasticsearch( return nil } +func fetchAPIKeyRoles(ctx context.Context, b bulk.Bulk, apiKeyID string) (*RoleT, error) { + res, err := b.APIKeyRead(ctx, apiKeyID, true) + if err != nil { + return nil, err + } + + roleMap, err := smap.Parse(res.RoleDescriptors) + if err != nil { + return nil, err + } + r := &RoleT{ + Raw: res.RoleDescriptors, + } + + // Stable hash on permissions payload + if r.Sha2, err = roleMap.Hash(); err != nil { + return nil, err + } + + return r, nil +} + +// mergeRoles takes old and new role sets and merges them following these rules: +// - take all new roles +// - append all old roles +// to avoid name collisions every old entry has a `rdstale` suffix +// if rdstale suffix already exists it uses `{index}-rdstale` to avoid further collisions +// everything ending with `rdstale` is removed on ack. +// in case we have key `123` in both old and new result will be: {"123", "123-0-rdstale"} +// in case old contains {"123", "123-0-rdstale"} and new contains {"123"} result is: {"123", "123-rdstale", "123-0-rdstale"} +func mergeRoles(zlog zerolog.Logger, old, new *RoleT) (*RoleT, error) { + if old == nil { + return new, nil + } + if new == nil { + return old, nil + } + + oldMap, err := smap.Parse(old.Raw) + if err != nil { + return nil, err + } + if oldMap == nil { + return new, nil + } + + newMap, err := smap.Parse(new.Raw) + if err != nil { + return nil, err + } + if newMap == nil { + return old, nil + } + + destMap := smap.Map{} + // copy all from new + for k, v := range newMap { + destMap[k] = v + } + + findNewKey := func(m smap.Map, candidate string) string { + if strings.HasSuffix(candidate, "-rdstale") { + candidate = strings.TrimSuffix(candidate, "-rdstale") + dashIdx := strings.LastIndex(candidate, "-") + if dashIdx >= 0 { + candidate = candidate[:dashIdx] + } + + } + + // 1 should be enough, 100 is just to have some space + for i := 0; i < 100; i++ { + c := fmt.Sprintf("%s-%d-rdstale", candidate, i) + + if _, exists := m[c]; !exists { + return c + } + } + + return "" + } + // copy old + for k, v := range oldMap { + newKey := findNewKey(destMap, k) + if newKey == "" { + zlog.Warn().Msg("Failed to find a key for role assignement.") + + zlog.Debug(). + RawJSON("roles", new.Raw). + Str("candidate", k). + Msg("roles not included.") + + continue + } + destMap[newKey] = v + } + + r := &RoleT{} + if r.Sha2, err = destMap.Hash(); err != nil { + return nil, err + } + if r.Raw, err = json.Marshal(destMap); err != nil { + return nil, err + } + + return r, nil +} + func renderUpdatePainlessScript(outputName string, fields map[string]interface{}) ([]byte, error) { var source strings.Builder diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d66275d04..f74d57b3e 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -145,20 +145,22 @@ func TestPolicyOutputESPrepare(t *testing.T) { bulker.AssertExpectations(t) }) - t.Run("Permission hash != Agent Permission Hash need to regenerate the key", func(t *testing.T) { + t.Run("Permission hash != Agent Permission Hash need to regenerate permissions", func(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk() oldAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} - wantAPIKey := bulk.APIKey{ID: "abc", Key: "new-key"} + wantAPIKey := bulk.APIKey{ID: "test_id", Key: "EXISTING-KEY"} hashPerm := "old-HASH" + bulker. + On("APIKeyRead", mock.Anything, mock.Anything, mock.Anything). + Return(&bulk.APIKeyMetadata{ID: "test_id", RoleDescriptors: TestPayload}, nil). + Once() bulker.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil).Once() - bulker.On("APIKeyCreate", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&wantAPIKey, nil).Once() + bulker.On("APIKeyUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() output := Output{ Type: OutputTypeElasticsearch, diff --git a/internal/pkg/testing/bulk.go b/internal/pkg/testing/bulk.go index 1123232b7..724d54086 100644 --- a/internal/pkg/testing/bulk.go +++ b/internal/pkg/testing/bulk.go @@ -83,7 +83,7 @@ func (m *MockBulk) APIKeyCreate(ctx context.Context, name, ttl string, roles []b return args.Get(0).(*bulk.APIKey), args.Error(1) } -func (m *MockBulk) APIKeyRead(ctx context.Context, id string) (*bulk.APIKeyMetadata, error) { +func (m *MockBulk) APIKeyRead(ctx context.Context, id string, _ bool) (*bulk.APIKeyMetadata, error) { args := m.Called(ctx, id) return args.Get(0).(*bulk.APIKeyMetadata), args.Error(1) } @@ -98,4 +98,9 @@ func (m *MockBulk) APIKeyInvalidate(ctx context.Context, ids ...string) error { return args.Error(0) } +func (m *MockBulk) APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error { + args := m.Called(ctx, id) + return args.Error(0) +} + var _ bulk.Bulk = (*MockBulk)(nil)