Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass namespaces to Fleet Server managed documents #3535

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions changelog/fragments/1715862782-support-namespaces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: support-namespaces

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Add suport for namespaces, Fleet server will now add the Namespaces property to created .fleet-* documennts.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "fleet-server"
# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/fleet-server/pull/3535
25 changes: 14 additions & 11 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,25 @@ func (ack *AckT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, r *
return &req, nil
}

func eventToActionResult(agentID, aType string, ev AckRequest_Events_Item) (acr model.ActionResult) {
func eventToActionResult(agentID, aType string, namespaces []string, ev AckRequest_Events_Item) (acr model.ActionResult) {
switch aType {
case string(REQUESTDIAGNOSTICS):
event, _ := ev.AsDiagnosticsEvent()
p, _ := json.Marshal(event.Data)
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Data: p,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
ActionID: event.ActionId,
AgentID: agentID,
Namespaces: namespaces,
Data: p,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
}
case string(INPUTACTION):
event, _ := ev.AsInputEvent()
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Namespaces: namespaces,
ActionInputType: event.ActionInputType,
StartedAt: event.StartedAt.Format(time.RFC3339Nano),
CompletedAt: event.CompletedAt.Format(time.RFC3339Nano),
Expand All @@ -191,10 +193,11 @@ func eventToActionResult(agentID, aType string, ev AckRequest_Events_Item) (acr
default: // UPGRADE action acks are also handled by handelUpgrade (deprecated func)
event, _ := ev.AsGenericEvent()
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
ActionID: event.ActionId,
Namespaces: namespaces,
AgentID: agentID,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
}
}
}
Expand Down Expand Up @@ -358,7 +361,7 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag
defer span.End()

// Convert ack event to action result document
acr := eventToActionResult(agent.Id, action.Type, ev)
acr := eventToActionResult(agent.Id, action.Type, action.Namespaces, ev)

// Save action result document
if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestMakeUpdatePolicyBody(t *testing.T) {
func TestEventToActionResult(t *testing.T) {
agentID := "6e9b6655-8cfe-4eb6-9b2f-c10aefae7517"
t.Run("generic", func(t *testing.T) {
r := eventToActionResult(agentID, "UPGRADE", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "UPGRADE", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z"
Expand All @@ -72,7 +72,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Empty(t, r.Error)
})
t.Run("with error", func(t *testing.T) {
r := eventToActionResult(agentID, "UPGRADE", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "UPGRADE", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand All @@ -84,7 +84,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Equal(t, "error message", r.Error)
})
t.Run("request diagnostics", func(t *testing.T) {
r := eventToActionResult(agentID, "REQUEST_DIAGNOSTICS", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "REQUEST_DIAGNOSTICS", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand All @@ -98,7 +98,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Equal(t, "error message", r.Error)
})
t.Run("input action", func(t *testing.T) {
r := eventToActionResult(agentID, "INPUT_ACTION", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "INPUT_ACTION", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (et *EnrollerT) processRequest(zlog zerolog.Logger, w http.ResponseWriter,

cntEnroll.bodyIn.Add(readCounter.Count())

return et._enroll(r.Context(), rb, zlog, req, enrollAPI.PolicyID, ver)
return et._enroll(r.Context(), rb, zlog, req, enrollAPI.PolicyID, enrollAPI.Namespaces, ver)
}

// retrieveStaticTokenEnrollmentToken fetches the enrollment key record from the config static tokens.
Expand Down Expand Up @@ -190,7 +190,8 @@ func (et *EnrollerT) _enroll(
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
policyID string,
namespaces []string,
ver string,
) (*EnrollResponse, error) {
var agent model.Agent
Expand Down Expand Up @@ -272,6 +273,7 @@ func (et *EnrollerT) _enroll(
agentData := model.Agent{
Active: true,
PolicyID: policyID,
Namespaces: namespaces,
Type: string(req.Type),
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/handleEnroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestEnroll(t *testing.T) {
}, nil)
bulker.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
"", nil)
resp, _ := et._enroll(ctx, rb, zlog, req, "1234", "8.9.0")
resp, _ := et._enroll(ctx, rb, zlog, req, "1234", []string{}, "8.9.0")

if resp.Action != "created" {
t.Fatal("enroll failed")
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func (ut *UploadT) handleUploadBegin(_ zerolog.Logger, w http.ResponseWriter, r
return err
}

_, err = ut.authAgent(r, &agentID, ut.bulker, ut.cache)
agent, err := ut.authAgent(r, &agentID, ut.bulker, ut.cache)
if err != nil {
return err
}

// validate payload, enrich with additional fields, and write metadata doc to ES
info, err := ut.uploader.Begin(r.Context(), payload)
info, err := ut.uploader.Begin(r.Context(), agent.Namespaces, payload)
if err != nil {
return err
}
Expand Down
49 changes: 26 additions & 23 deletions internal/pkg/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ type Hash struct {
}

type MetaDoc struct {
ActionID string `json:"action_id"`
AgentID string `json:"agent_id"`
Source string `json:"src"`
File FileData `json:"file"`
UploadID string `json:"upload_id"`
Start time.Time `json:"upload_start"`
ActionID string `json:"action_id"`
AgentID string `json:"agent_id"`
Source string `json:"src"`
File FileData `json:"file"`
UploadID string `json:"upload_id"`
Start time.Time `json:"upload_start"`
Namespaces []string `json:"namespaces"`
}

// custom unmarshaller to make unix-epoch values work
Expand All @@ -71,26 +72,28 @@ func (m *MetaDoc) UnmarshalJSON(b []byte) error {
}

type ChunkInfo struct {
Pos int // Ordered chunk position in file
Last bool // Is this the final chunk in the file
SHA2 string
Size int
BID string // base id, matches metadata doc's _id
Index string
ID string // chunk _id
Pos int // Ordered chunk position in file
Last bool // Is this the final chunk in the file
SHA2 string
Size int
BID string // base id, matches metadata doc's _id
Index string
ID string // chunk _id
Namespaces []string
}

type Info struct {
ID string // upload operation identifier. Used to identify the upload process
DocID string // document ID of the uploaded file and chunks
Source string // which integration is performing the upload
AgentID string
ActionID string
ChunkSize int64
Total int64
Count int
Start time.Time
Status Status
ID string // upload operation identifier. Used to identify the upload process
DocID string // document ID of the uploaded file and chunks
Source string // which integration is performing the upload
AgentID string
ActionID string
Namespaces []string
ChunkSize int64
Total int64
Count int
Start time.Time
Status Status
}

// convenience functions for computing current "Status" based on the fields
Expand Down
24 changes: 14 additions & 10 deletions internal/pkg/file/uploader/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(chunkClient *elasticsearch.Client, bulker bulk.Bulk, cache cache.Cache,
}

// Start an upload operation
func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
func (u *Uploader) Begin(ctx context.Context, namespaces []string, data JSDict) (file.Info, error) {
vSpan, _ := apm.StartSpan(ctx, "validateFileInfo", "validate")
if data == nil {
vSpan.End()
Expand Down Expand Up @@ -92,15 +92,16 @@ func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
docID := fmt.Sprintf("%s.%s", actionID, agentID)

info := file.Info{
ID: id,
DocID: docID,
AgentID: agentID,
ActionID: actionID,
ChunkSize: file.MaxChunkSize,
Source: source,
Total: size,
Status: file.StatusAwaiting,
Start: time.Now(),
ID: id,
DocID: docID,
AgentID: agentID,
ActionID: actionID,
Namespaces: namespaces,
ChunkSize: file.MaxChunkSize,
Source: source,
Total: size,
Status: file.StatusAwaiting,
Start: time.Now(),
}
chunkCount := info.Total / info.ChunkSize
if info.Total%info.ChunkSize > 0 {
Expand All @@ -127,6 +128,9 @@ func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
if err := data.Put(info.Start.UnixMilli(), "@timestamp"); err != nil {
return file.Info{}, err
}
if err := data.Put(info.Namespaces, "namespaces"); err != nil {
return file.Info{}, err
}

/*
Write to storage
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/file/uploader/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestUploadBeginReturnsCorrectInfo(t *testing.T) {
c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
require.NoError(t, err)
u := New(nil, fakeBulk, c, int64(size), time.Hour)
info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

assert.Equal(t, int64(size), info.Total)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestUploadBeginWritesDocumentFromInputs(t *testing.T) {
c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
require.NoError(t, err)
u := New(nil, fakeBulk, c, int64(size), time.Hour)
_, err = u.Begin(context.Background(), data)
_, err = u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

payload, ok := fakeBulk.Calls[0].Arguments[3].([]byte)
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestUploadBeginCalculatesCorrectChunkCount(t *testing.T) {
data := makeUploadRequestDict(map[string]interface{}{
"file.size": tc.FileSize,
})
info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)
assert.Equal(t, tc.ExpectedCount, info.Count)
})
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestUploadBeginMaxFileSize(t *testing.T) {
data := makeUploadRequestDict(map[string]interface{}{
"file.size": tc.FileSize,
})
_, err := u.Begin(context.Background(), data)
_, err := u.Begin(context.Background(), []string{}, data)
if tc.ShouldError {
assert.ErrorIs(t, err, ErrFileSizeTooLarge)
} else {
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestUploadRejectsMissingRequiredFields(t *testing.T) {
}
}

_, err = u.Begin(context.Background(), data)
_, err = u.Begin(context.Background(), []string{}, data)
assert.Errorf(t, err, "%s is a required field and should error if not provided", field)
})

Expand Down Expand Up @@ -343,7 +343,7 @@ func TestChunkMarksFinal(t *testing.T) {
"file.size": tc.FileSize,
})

info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

// for anything larger than 1-chunk, check for off-by-ones
Expand Down
21 changes: 18 additions & 3 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading