Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk API Keys update #1779

Merged
merged 8 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/smap"
"github.com/pkg/errors"

"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -349,15 +350,49 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

if agent.DefaultAPIKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, agent.DefaultAPIKeyID, true)
if err != nil {
zlog.Error().
Err(err).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Msg("Failed to read API Key roles")
} else {
clean, removedRolesCount, err := cleanRoles(res.RoleDescriptors)
if err != nil {
zlog.Error().
Err(err).
RawJSON("roles", res.RoleDescriptors).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := ack.bulk.APIKeyUpdate(ctx, agent.DefaultAPIKeyID, agent.PolicyOutputPermissionsHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, agent.DefaultAPIKeyID).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", agent.PolicyOutputPermissionsHash).
Str(LogAPIKeyID, agent.DefaultAPIKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Msg("Updating agent record to pick up reduced roles.")
}
}
}
}

sz := len(agent.DefaultAPIKeyHistory)
if sz > 0 {
ids := make([]string, sz)
for i := 0; i < sz; i++ {
if agent.DefaultAPIKeyHistory[i].ID == agent.DefaultAPIKeyID {
// already updated
continue
}
ids[i] = agent.DefaultAPIKeyHistory[i].ID
}
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
log.Warn().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}

Expand All @@ -376,7 +411,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
bulk.WithRetryOnConflict(3),
)

zlog.Info().Err(err).
zlog.Err(err).
Str(LogPolicyID, agent.PolicyID).
Int64("policyRevision", currRev).
Int64("policyCoordinator", currCoord).
Expand All @@ -385,6 +420,31 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return errors.Wrap(err, "handlePolicyChange update")
}

func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {
rr := smap.Map{}
if err := json.Unmarshal(roles, &rr); err != nil {
return nil, 0, errors.Wrap(err, "failed to unmarshal provided roles")
}

keys := make([]string, 0, len(rr))
for k := range rr {
if strings.HasSuffix(k, "-rdstale") {
keys = append(keys, k)
}
}

if len(keys) == 0 {
return roles, 0, nil
}

for _, k := range keys {
delete(rr, k)
}

r, err := json.Marshal(rr)
return r, len(keys), errors.Wrap(err, "failed to marshal resulting role definition")
}

func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
apiKeys := _getAPIKeyIDs(agent)
if len(apiKeys) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func invalidateAPIKey(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk
LOOP:
for {

_, err := bulker.APIKeyRead(ctx, apikeyID)
_, err := bulker.APIKeyRead(ctx, apikeyID, false)

switch {
case err == nil:
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/apikey/apikey_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) {
}

// Get the key and verify that metadata was saved correctly
aKeyMeta, err := Read(ctx, es, akey.ID)
aKeyMeta, err := Read(ctx, es, akey.ID, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -80,7 +80,7 @@ func TestCreateAPIKeyWithMetadata(t *testing.T) {
}

// Try to get the key that doesn't exists, expect ErrApiKeyNotFound
_, err = Read(ctx, es, "0000000000000")
_, err = Read(ctx, es, "0000000000000", false)
if !errors.Is(err, ErrAPIKeyNotFound) {
t.Errorf("Unexpected error type: %v", err)
}
Expand Down
20 changes: 13 additions & 7 deletions internal/pkg/apikey/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ import (

// APIKetMetadata tracks Metadata associated with an APIKey.
type APIKeyMetadata struct {
ID string
Metadata Metadata
ID string
Metadata Metadata
RoleDescriptors json.RawMessage
}

// Read gathers APIKeyMetadata from Elasticsearch using the given client.
func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) {
func Read(ctx context.Context, client *elasticsearch.Client, id string, withOwner bool) (*APIKeyMetadata, error) {

opts := []func(*esapi.SecurityGetAPIKeyRequest){
client.Security.GetAPIKey.WithContext(ctx),
client.Security.GetAPIKey.WithID(id),
}
if withOwner {
opts = append(opts, client.Security.GetAPIKey.WithOwner(true))
}

res, err := client.Security.GetAPIKey(
opts...,
Expand All @@ -42,8 +46,9 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey
}

type APIKeyResponse struct {
ID string `json:"id"`
Metadata Metadata `json:"metadata"`
ID string `json:"id"`
Metadata Metadata `json:"metadata"`
RoleDescriptors json.RawMessage `json:"role_descriptors"`
}
type GetAPIKeyResponse struct {
APIKeys []APIKeyResponse `json:"api_keys"`
Expand All @@ -62,7 +67,8 @@ func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKey
first := resp.APIKeys[0]

return &APIKeyMetadata{
ID: first.ID,
Metadata: first.Metadata,
ID: first.ID,
Metadata: first.Metadata,
RoleDescriptors: first.RoleDescriptors,
}, nil
}
2 changes: 2 additions & 0 deletions internal/pkg/bulk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
ActionDelete
ActionIndex
ActionUpdate
ActionUpdateAPIKey
ActionRead
ActionSearch
ActionFleetSearch
Expand All @@ -53,6 +54,7 @@ var actionStrings = []string{
"delete",
"index",
"update",
"update_api_key",
"read",
"search",
"fleet_search",
Expand Down
8 changes: 7 additions & 1 deletion internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type Bulk interface {

// APIKey operations
APIKeyCreate(ctx context.Context, name, ttl string, roles []byte, meta interface{}) (*APIKey, error)
APIKeyRead(ctx context.Context, id string) (*APIKeyMetadata, error)
APIKeyRead(ctx context.Context, id string, withOwner bool) (*APIKeyMetadata, error)
APIKeyAuth(ctx context.Context, key APIKey) (*SecurityInfo, error)
APIKeyInvalidate(ctx context.Context, ids ...string) error
APIKeyUpdate(ctx context.Context, id, outputPolicyHash string, roles []byte) error

// Accessor used to talk to elastic search direcly bypassing bulk engine
Client() *elasticsearch.Client
Expand All @@ -81,6 +82,7 @@ const (
defaultMaxPending = 32
defaultBlockQueueSz = 32 // Small capacity to allow multiOp to spin fast
defaultAPIKeyMaxParallel = 32
defaultApikeyMaxReqSize = 100 * 1024 * 1024
)

func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker {
Expand Down Expand Up @@ -136,6 +138,8 @@ func blkToQueueType(blk *bulkT) queueType {
} else {
queueIdx = kQueueRead
}
case ActionUpdateAPIKey:
queueIdx = kQueueAPIKeyUpdate
default:
if forceRefresh {
queueIdx = kQueueRefreshBulk
Expand Down Expand Up @@ -288,6 +292,8 @@ func (b *Bulker) flushQueue(ctx context.Context, w *semaphore.Weighted, queue qu
err = b.flushRead(ctx, queue)
case kQueueSearch, kQueueFleetSearch:
err = b.flushSearch(ctx, queue)
case kQueueAPIKeyUpdate:
err = b.flushUpdateAPIKey(ctx, queue)
default:
err = b.flushBulk(ctx, queue)
}
Expand Down
Loading