Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/azuread: avoid work on…
Browse files Browse the repository at this point in the history
… unwanted datasets (elastic#36753)

During full sync the provider may have state from a previous dataset. So
in the case that the user has changed dataset from users to devices or
vice versa the provider may publish already existing state in the entity
graph. This change adds conditional checks to ensure that unwanted
dataset records are not published.
  • Loading branch information
efd6 authored Oct 7, 2023
1 parent 05a88cf commit ba2a641
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ is collected by it.
- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736]
- Allow http_endpoint input to receive PUT and PATCH requests. {pull}36734[36734]
- Add cache processor. {pull}36786[36786]
- Avoid unwanted publication of Azure entity records. {pull}36753[36753]

*Auditbeat*

Expand Down
97 changes: 54 additions & 43 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -151,21 +150,22 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
return err
}

if len(state.users) != 0 || len(state.devices) != 0 {
wantUsers := p.conf.wantUsers()
wantDevices := p.conf.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)

if len(state.users) != 0 {
if len(state.users) != 0 && wantUsers {
p.logger.Debugw("publishing users", "count", len(state.devices))
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

}

if len(state.devices) != 0 {
if len(state.devices) != 0 && wantDevices {
p.logger.Debugw("publishing devices", "count", len(state.devices))
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
Expand Down Expand Up @@ -224,7 +224,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

}

if updatedDevices.Len() != 0 {
Expand All @@ -236,7 +235,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})

}

tracker.Wait()
Expand Down Expand Up @@ -269,32 +267,32 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

var (
wantUsers = p.conf.wantUsers()
changedUsers []*fetcher.User
userLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "users":
if wantUsers {
changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))
default:
} else {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
}

var (
wantDevices = p.conf.wantDevices()
changedDevices []*fetcher.Device
deviceLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "devices":
if wantDevices {
changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedDevices))
default:
} else {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
}

Expand Down Expand Up @@ -337,6 +335,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
for _, member := range g.Members {
switch member.Type {
case fetcher.MemberGroup:
if !wantUsers {
break
}
for _, u := range state.users {
if u.TransitiveMemberOf.Contains(member.ID) {
updatedUsers.Add(u.ID)
Expand All @@ -349,6 +350,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberUser:
if !wantUsers {
break
}
if u, ok := state.users[member.ID]; ok {
updatedUsers.Add(u.ID)
if member.Deleted {
Expand All @@ -359,6 +363,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberDevice:
if !wantDevices {
break
}
if d, ok := state.devices[member.ID]; ok {
updatedDevices.Add(d.ID)
if member.Deleted {
Expand All @@ -372,42 +379,46 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

// Expand user group memberships.
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}
if wantUsers {
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}

u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
})
})
})
}

// Expand device group memberships.
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}
if wantDevices {
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}

d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
})
})
}

return updatedUsers, updatedDevices, nil
}
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ func defaultConf() conf {
UpdateInterval: defaultUpdateInterval,
}
}

func (c *conf) wantUsers() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "users":
return true
default:
return false
}
}

func (c *conf) wantDevices() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "devices":
return true
default:
return false
}
}

0 comments on commit ba2a641

Please sign in to comment.