Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/provider/azuread: avoid work on…
Browse files Browse the repository at this point in the history
… unwanted datasets
  • Loading branch information
efd6 committed Oct 4, 2023
1 parent febe538 commit 9b8942a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 46 deletions.
105 changes: 59 additions & 46 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -151,21 +150,22 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
return err
}

if len(state.users) != 0 || len(state.devices) != 0 {
wantUsers := p.conf.wantUsers()
wantDevices := p.conf.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)

if len(state.users) != 0 {
if len(state.users) != 0 && wantUsers {
p.logger.Debugw("publishing users", "count", len(state.devices))
for _, u := range state.users {
p.publishUser(u, state, inputCtx.ID, client, tracker)
}

}

if len(state.devices) != 0 {
if len(state.devices) != 0 && wantDevices {
p.logger.Debugw("publishing devices", "count", len(state.devices))
for _, d := range state.devices {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
Expand Down Expand Up @@ -212,10 +212,12 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
return err
}

if updatedUsers.Len() != 0 || updatedDevices.Len() != 0 {
wantUsers := p.conf.wantUsers()
wantDevices := p.conf.wantDevices()
if (updatedUsers.Len() != 0 && wantUsers) || (updatedDevices.Len() != 0 && wantDevices) {
tracker := kvstore.NewTxTracker(ctx)

if updatedUsers.Len() != 0 {
if updatedUsers.Len() != 0 && wantUsers {
updatedUsers.ForEach(func(id uuid.UUID) {
u, ok := state.users[id]
if !ok {
Expand All @@ -224,10 +226,9 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishUser(u, state, inputCtx.ID, client, tracker)
})

}

if updatedDevices.Len() != 0 {
if updatedDevices.Len() != 0 && wantDevices {
updatedDevices.ForEach(func(id uuid.UUID) {
d, ok := state.devices[id]
if !ok {
Expand All @@ -236,7 +237,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store,
}
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})

}

tracker.Wait()
Expand Down Expand Up @@ -269,32 +269,32 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

var (
wantUsers = p.conf.wantUsers()
changedUsers []*fetcher.User
userLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "users":
if wantUsers {
changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))
default:
} else {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
}

var (
wantDevices = p.conf.wantDevices()
changedDevices []*fetcher.Device
deviceLink string
)
switch strings.ToLower(p.conf.Dataset) {
case "", "all", "devices":
if wantDevices {
changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedDevices))
default:
} else {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
}

Expand Down Expand Up @@ -337,6 +337,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
for _, member := range g.Members {
switch member.Type {
case fetcher.MemberGroup:
if !wantUsers {
break
}
for _, u := range state.users {
if u.TransitiveMemberOf.Contains(member.ID) {
updatedUsers.Add(u.ID)
Expand All @@ -349,6 +352,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberUser:
if !wantUsers {
break
}
if u, ok := state.users[member.ID]; ok {
updatedUsers.Add(u.ID)
if member.Deleted {
Expand All @@ -359,6 +365,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

case fetcher.MemberDevice:
if !wantDevices {
break
}
if d, ok := state.devices[member.ID]; ok {
updatedDevices.Add(d.ID)
if member.Deleted {
Expand All @@ -372,42 +381,46 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (
}

// Expand user group memberships.
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}
if wantUsers {
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}

u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
})
})
})
}

// Expand device group memberships.
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}
if wantDevices {
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}

d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
})
})
}

return updatedUsers, updatedDevices, nil
}
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ func defaultConf() conf {
UpdateInterval: defaultUpdateInterval,
}
}

func (c *conf) wantUsers() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "users":
return true
default:
return false
}
}

func (c *conf) wantDevices() bool {
switch strings.ToLower(c.Dataset) {
case "", "all", "devices":
return true
default:
return false
}
}

0 comments on commit 9b8942a

Please sign in to comment.