From 9b8942a97faa44fc326433776d5912983ffb8bc9 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 5 Oct 2023 08:24:37 +1030 Subject: [PATCH] x-pack/filebeat/input/entityanalytics/provider/azuread: avoid work on unwanted datasets --- .../entityanalytics/provider/azuread/azure.go | 105 ++++++++++-------- .../entityanalytics/provider/azuread/conf.go | 18 +++ 2 files changed, 77 insertions(+), 46 deletions(-) diff --git a/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go b/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go index 7db004237c92..28458315dfdb 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go +++ b/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go @@ -9,7 +9,6 @@ import ( "context" "errors" "fmt" - "strings" "time" "github.com/google/uuid" @@ -151,21 +150,22 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be return err } - if len(state.users) != 0 || len(state.devices) != 0 { + wantUsers := p.conf.wantUsers() + wantDevices := p.conf.wantDevices() + if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) { tracker := kvstore.NewTxTracker(ctx) start := time.Now() p.publishMarker(start, start, inputCtx.ID, true, client, tracker) - if len(state.users) != 0 { + if len(state.users) != 0 && wantUsers { p.logger.Debugw("publishing users", "count", len(state.devices)) for _, u := range state.users { p.publishUser(u, state, inputCtx.ID, client, tracker) } - } - if len(state.devices) != 0 { + if len(state.devices) != 0 && wantDevices { p.logger.Debugw("publishing devices", "count", len(state.devices)) for _, d := range state.devices { p.publishDevice(d, state, inputCtx.ID, client, tracker) @@ -212,10 +212,12 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, return err } - if updatedUsers.Len() != 0 || updatedDevices.Len() != 0 { + wantUsers := p.conf.wantUsers() + wantDevices := p.conf.wantDevices() + if (updatedUsers.Len() != 0 && wantUsers) || (updatedDevices.Len() != 0 && wantDevices) { tracker := kvstore.NewTxTracker(ctx) - if updatedUsers.Len() != 0 { + if updatedUsers.Len() != 0 && wantUsers { updatedUsers.ForEach(func(id uuid.UUID) { u, ok := state.users[id] if !ok { @@ -224,10 +226,9 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, } p.publishUser(u, state, inputCtx.ID, client, tracker) }) - } - if updatedDevices.Len() != 0 { + if updatedDevices.Len() != 0 && wantDevices { updatedDevices.ForEach(func(id uuid.UUID) { d, ok := state.devices[id] if !ok { @@ -236,7 +237,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, } p.publishDevice(d, state, inputCtx.ID, client, tracker) }) - } tracker.Wait() @@ -269,32 +269,32 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } var ( + wantUsers = p.conf.wantUsers() changedUsers []*fetcher.User userLink string ) - switch strings.ToLower(p.conf.Dataset) { - case "", "all", "users": + if wantUsers { changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink) if err != nil { return updatedUsers, updatedDevices, err } p.logger.Debugf("Received %d users from API", len(changedUsers)) - default: + } else { p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset) } var ( + wantDevices = p.conf.wantDevices() changedDevices []*fetcher.Device deviceLink string ) - switch strings.ToLower(p.conf.Dataset) { - case "", "all", "devices": + if wantDevices { changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink) if err != nil { return updatedUsers, updatedDevices, err } p.logger.Debugf("Received %d devices from API", len(changedDevices)) - default: + } else { p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset) } @@ -337,6 +337,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( for _, member := range g.Members { switch member.Type { case fetcher.MemberGroup: + if !wantUsers { + break + } for _, u := range state.users { if u.TransitiveMemberOf.Contains(member.ID) { updatedUsers.Add(u.ID) @@ -349,6 +352,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } case fetcher.MemberUser: + if !wantUsers { + break + } if u, ok := state.users[member.ID]; ok { updatedUsers.Add(u.ID) if member.Deleted { @@ -359,6 +365,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } case fetcher.MemberDevice: + if !wantDevices { + break + } if d, ok := state.devices[member.ID]; ok { updatedDevices.Add(d.ID) if member.Deleted { @@ -372,42 +381,46 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } // Expand user group memberships. - updatedUsers.ForEach(func(userID uuid.UUID) { - u, ok := state.users[userID] - if !ok { - p.logger.Errorf("Unable to find user %q in state", userID) - return - } - u.Modified = true - if u.Deleted { - p.logger.Debugw("not expanding membership for deleted user", "user", userID) - return - } + if wantUsers { + updatedUsers.ForEach(func(userID uuid.UUID) { + u, ok := state.users[userID] + if !ok { + p.logger.Errorf("Unable to find user %q in state", userID) + return + } + u.Modified = true + if u.Deleted { + p.logger.Debugw("not expanding membership for deleted user", "user", userID) + return + } - u.TransitiveMemberOf = u.MemberOf - state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) { - u.TransitiveMemberOf.Add(elem) + u.TransitiveMemberOf = u.MemberOf + state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) { + u.TransitiveMemberOf.Add(elem) + }) }) - }) + } // Expand device group memberships. - updatedDevices.ForEach(func(devID uuid.UUID) { - d, ok := state.devices[devID] - if !ok { - p.logger.Errorf("Unable to find device %q in state", devID) - return - } - d.Modified = true - if d.Deleted { - p.logger.Debugw("not expanding membership for deleted device", "device", devID) - return - } + if wantDevices { + updatedDevices.ForEach(func(devID uuid.UUID) { + d, ok := state.devices[devID] + if !ok { + p.logger.Errorf("Unable to find device %q in state", devID) + return + } + d.Modified = true + if d.Deleted { + p.logger.Debugw("not expanding membership for deleted device", "device", devID) + return + } - d.TransitiveMemberOf = d.MemberOf - state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) { - d.TransitiveMemberOf.Add(elem) + d.TransitiveMemberOf = d.MemberOf + state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) { + d.TransitiveMemberOf.Add(elem) + }) }) - }) + } return updatedUsers, updatedDevices, nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go b/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go index 105b05cbbbbd..137951bcc78c 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go @@ -52,3 +52,21 @@ func defaultConf() conf { UpdateInterval: defaultUpdateInterval, } } + +func (c *conf) wantUsers() bool { + switch strings.ToLower(c.Dataset) { + case "", "all", "users": + return true + default: + return false + } +} + +func (c *conf) wantDevices() bool { + switch strings.ToLower(c.Dataset) { + case "", "all", "devices": + return true + default: + return false + } +}