Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/entityanalytics/provider/azuread: avoid work on unwanted datasets #36753

Merged
merged 2 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ is collected by it.
- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736]
- Allow http_endpoint input to receive PUT and PATCH requests. {pull}36734[36734]
- Add cache processor. {pull}36786[36786]
- Avoid unwanted publication of Azure entity records. {pull}36753[36753]

*Auditbeat*

Expand Down
97 changes: 54 additions & 43 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -151,21 +150,22 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
return err
}

if len(state.users) != 0 || len(state.devices) != 0 {
wantUsers := p.conf.wantUsers()
wantDevices := p.conf.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)

if len(state.users) != 0 {
if len(state.users) != 0 && wantUsers {
p.logger.Debugw("publishing users", "count", len(state.devices))
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

}

if len(state.devices) != 0 {
if len(state.devices) != 0 && wantDevices {
p.logger.Debugw("publishing devices", "count", len(state.devices))
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
Expand Down Expand Up @@ -224,7 +224,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

}

if updatedDevices.Len() != 0 {
Expand All @@ -236,7 +235,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})

}

tracker.Wait()
Expand Down Expand Up @@ -269,32 +267,32 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

var (
wantUsers = p.conf.wantUsers()
changedUsers []*fetcher.User
userLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "users":
if wantUsers {
changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))
default:
} else {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
}

var (
wantDevices = p.conf.wantDevices()
changedDevices []*fetcher.Device
deviceLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "devices":
if wantDevices {
changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedDevices))
default:
} else {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
}

Expand Down Expand Up @@ -337,6 +335,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
for _, member := range g.Members {
switch member.Type {
case fetcher.MemberGroup:
if !wantUsers {
break
}
for _, u := range state.users {
if u.TransitiveMemberOf.Contains(member.ID) {
updatedUsers.Add(u.ID)
Expand All @@ -349,6 +350,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberUser:
if !wantUsers {
break
}
if u, ok := state.users[member.ID]; ok {
updatedUsers.Add(u.ID)
if member.Deleted {
Expand All @@ -359,6 +363,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberDevice:
if !wantDevices {
break
}
if d, ok := state.devices[member.ID]; ok {
updatedDevices.Add(d.ID)
if member.Deleted {
Expand All @@ -372,42 +379,46 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

// Expand user group memberships.
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}
if wantUsers {
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}

u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
})
})
})
}

// Expand device group memberships.
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}
if wantDevices {
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}

d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
})
})
}

return updatedUsers, updatedDevices, nil
}
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ func defaultConf() conf {
UpdateInterval: defaultUpdateInterval,
}
}

func (c *conf) wantUsers() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "users":
return true
default:
return false
}
}

func (c *conf) wantDevices() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "devices":
return true
default:
return false
}
}