Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass namespaces to Fleet Server managed documents #3535

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,25 @@ func (ack *AckT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, r *
return &req, nil
}

func eventToActionResult(agentID, aType string, ev AckRequest_Events_Item) (acr model.ActionResult) {
func eventToActionResult(agentID, aType string, namespaces []string, ev AckRequest_Events_Item) (acr model.ActionResult) {
switch aType {
case string(REQUESTDIAGNOSTICS):
event, _ := ev.AsDiagnosticsEvent()
p, _ := json.Marshal(event.Data)
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Data: p,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
ActionID: event.ActionId,
AgentID: agentID,
Namespaces: namespaces,
Data: p,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
}
case string(INPUTACTION):
event, _ := ev.AsInputEvent()
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Namespaces: namespaces,
ActionInputType: event.ActionInputType,
StartedAt: event.StartedAt.Format(time.RFC3339Nano),
CompletedAt: event.CompletedAt.Format(time.RFC3339Nano),
Expand All @@ -191,10 +193,11 @@ func eventToActionResult(agentID, aType string, ev AckRequest_Events_Item) (acr
default: // UPGRADE action acks are also handled by handelUpgrade (deprecated func)
event, _ := ev.AsGenericEvent()
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
ActionID: event.ActionId,
Namespaces: namespaces,
AgentID: agentID,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
}
}
}
Expand Down Expand Up @@ -358,7 +361,7 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag
defer span.End()

// Convert ack event to action result document
acr := eventToActionResult(agent.Id, action.Type, ev)
acr := eventToActionResult(agent.Id, action.Type, action.Namespaces, ev)

// Save action result document
if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestMakeUpdatePolicyBody(t *testing.T) {
func TestEventToActionResult(t *testing.T) {
agentID := "6e9b6655-8cfe-4eb6-9b2f-c10aefae7517"
t.Run("generic", func(t *testing.T) {
r := eventToActionResult(agentID, "UPGRADE", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "UPGRADE", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z"
Expand All @@ -72,7 +72,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Empty(t, r.Error)
})
t.Run("with error", func(t *testing.T) {
r := eventToActionResult(agentID, "UPGRADE", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "UPGRADE", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand All @@ -84,7 +84,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Equal(t, "error message", r.Error)
})
t.Run("request diagnostics", func(t *testing.T) {
r := eventToActionResult(agentID, "REQUEST_DIAGNOSTICS", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "REQUEST_DIAGNOSTICS", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand All @@ -98,7 +98,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Equal(t, "error message", r.Error)
})
t.Run("input action", func(t *testing.T) {
r := eventToActionResult(agentID, "INPUT_ACTION", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "INPUT_ACTION", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (et *EnrollerT) processRequest(zlog zerolog.Logger, w http.ResponseWriter,

cntEnroll.bodyIn.Add(readCounter.Count())

return et._enroll(r.Context(), rb, zlog, req, enrollAPI.PolicyID, ver)
return et._enroll(r.Context(), rb, zlog, req, enrollAPI.PolicyID, enrollAPI.Namespaces, ver)
}

// retrieveStaticTokenEnrollmentToken fetches the enrollment key record from the config static tokens.
Expand Down Expand Up @@ -190,7 +190,8 @@ func (et *EnrollerT) _enroll(
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
policyID string,
namespaces []string,
ver string,
) (*EnrollResponse, error) {
var agent model.Agent
Expand Down Expand Up @@ -272,6 +273,7 @@ func (et *EnrollerT) _enroll(
agentData := model.Agent{
Active: true,
PolicyID: policyID,
Namespaces: namespaces,
Type: string(req.Type),
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/handleEnroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestEnroll(t *testing.T) {
}, nil)
bulker.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
"", nil)
resp, _ := et._enroll(ctx, rb, zlog, req, "1234", "8.9.0")
resp, _ := et._enroll(ctx, rb, zlog, req, "1234", []string{}, "8.9.0")

if resp.Action != "created" {
t.Fatal("enroll failed")
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func (ut *UploadT) handleUploadBegin(_ zerolog.Logger, w http.ResponseWriter, r
return err
}

_, err = ut.authAgent(r, &agentID, ut.bulker, ut.cache)
agent, err := ut.authAgent(r, &agentID, ut.bulker, ut.cache)
if err != nil {
return err
}

// validate payload, enrich with additional fields, and write metadata doc to ES
info, err := ut.uploader.Begin(r.Context(), payload)
info, err := ut.uploader.Begin(r.Context(), agent.Namespaces, payload)
if err != nil {
return err
}
Expand Down
49 changes: 26 additions & 23 deletions internal/pkg/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ type Hash struct {
}

type MetaDoc struct {
ActionID string `json:"action_id"`
AgentID string `json:"agent_id"`
Source string `json:"src"`
File FileData `json:"file"`
UploadID string `json:"upload_id"`
Start time.Time `json:"upload_start"`
ActionID string `json:"action_id"`
AgentID string `json:"agent_id"`
Source string `json:"src"`
File FileData `json:"file"`
UploadID string `json:"upload_id"`
Start time.Time `json:"upload_start"`
Namespaces []string `json:"namespaces"`
}

// custom unmarshaller to make unix-epoch values work
Expand All @@ -71,26 +72,28 @@ func (m *MetaDoc) UnmarshalJSON(b []byte) error {
}

type ChunkInfo struct {
Pos int // Ordered chunk position in file
Last bool // Is this the final chunk in the file
SHA2 string
Size int
BID string // base id, matches metadata doc's _id
Index string
ID string // chunk _id
Pos int // Ordered chunk position in file
Last bool // Is this the final chunk in the file
SHA2 string
Size int
BID string // base id, matches metadata doc's _id
Index string
ID string // chunk _id
Namespaces []string
}

type Info struct {
ID string // upload operation identifier. Used to identify the upload process
DocID string // document ID of the uploaded file and chunks
Source string // which integration is performing the upload
AgentID string
ActionID string
ChunkSize int64
Total int64
Count int
Start time.Time
Status Status
ID string // upload operation identifier. Used to identify the upload process
DocID string // document ID of the uploaded file and chunks
Source string // which integration is performing the upload
AgentID string
ActionID string
Namespaces []string
ChunkSize int64
Total int64
Count int
Start time.Time
Status Status
}

// convenience functions for computing current "Status" based on the fields
Expand Down
24 changes: 14 additions & 10 deletions internal/pkg/file/uploader/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(chunkClient *elasticsearch.Client, bulker bulk.Bulk, cache cache.Cache,
}

// Start an upload operation
func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
func (u *Uploader) Begin(ctx context.Context, namespaces []string, data JSDict) (file.Info, error) {
vSpan, _ := apm.StartSpan(ctx, "validateFileInfo", "validate")
if data == nil {
vSpan.End()
Expand Down Expand Up @@ -92,15 +92,16 @@ func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
docID := fmt.Sprintf("%s.%s", actionID, agentID)

info := file.Info{
ID: id,
DocID: docID,
AgentID: agentID,
ActionID: actionID,
ChunkSize: file.MaxChunkSize,
Source: source,
Total: size,
Status: file.StatusAwaiting,
Start: time.Now(),
ID: id,
DocID: docID,
AgentID: agentID,
ActionID: actionID,
Namespaces: namespaces,
ChunkSize: file.MaxChunkSize,
Source: source,
Total: size,
Status: file.StatusAwaiting,
Start: time.Now(),
}
chunkCount := info.Total / info.ChunkSize
if info.Total%info.ChunkSize > 0 {
Expand All @@ -127,6 +128,9 @@ func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
if err := data.Put(info.Start.UnixMilli(), "@timestamp"); err != nil {
return file.Info{}, err
}
if err := data.Put(info.Namespaces, "namespaces"); err != nil {
return file.Info{}, err
}

/*
Write to storage
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/file/uploader/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestUploadBeginReturnsCorrectInfo(t *testing.T) {
c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
require.NoError(t, err)
u := New(nil, fakeBulk, c, int64(size), time.Hour)
info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

assert.Equal(t, int64(size), info.Total)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestUploadBeginWritesDocumentFromInputs(t *testing.T) {
c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
require.NoError(t, err)
u := New(nil, fakeBulk, c, int64(size), time.Hour)
_, err = u.Begin(context.Background(), data)
_, err = u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

payload, ok := fakeBulk.Calls[0].Arguments[3].([]byte)
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestUploadBeginCalculatesCorrectChunkCount(t *testing.T) {
data := makeUploadRequestDict(map[string]interface{}{
"file.size": tc.FileSize,
})
info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)
assert.Equal(t, tc.ExpectedCount, info.Count)
})
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestUploadBeginMaxFileSize(t *testing.T) {
data := makeUploadRequestDict(map[string]interface{}{
"file.size": tc.FileSize,
})
_, err := u.Begin(context.Background(), data)
_, err := u.Begin(context.Background(), []string{}, data)
if tc.ShouldError {
assert.ErrorIs(t, err, ErrFileSizeTooLarge)
} else {
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestUploadRejectsMissingRequiredFields(t *testing.T) {
}
}

_, err = u.Begin(context.Background(), data)
_, err = u.Begin(context.Background(), []string{}, data)
assert.Errorf(t, err, "%s is a required field and should error if not provided", field)
})

Expand Down Expand Up @@ -343,7 +343,7 @@ func TestChunkMarksFinal(t *testing.T) {
"file.size": tc.FileSize,
})

info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

// for anything larger than 1-chunk, check for off-by-ones
Expand Down
21 changes: 18 additions & 3 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading