Skip to content

Commit

Permalink
x-pack/filebeat/input/entityanalytics/azuread: add registered owner/u…
Browse files Browse the repository at this point in the history
…ser handling

WIP: needs testing on azuread.azure.publishDevice for registered owners and users.
  • Loading branch information
efd6 committed Jul 18, 2023
1 parent b481143 commit 4d7445b
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add `clean_session` configuration setting for MQTT input. {pull}35806[16204]
- Add fingerprint mode for the filestream scanner and new file identity based on it {issue}34419[34419] {pull}35734[35734]
- Add file system metadata to events ingested via filestream {issue}35801[35801] {pull}36065[36065]
- Add support for register owners and users to AzureAD entity analytics provider. {pull}[]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]
Expand Down
26 changes: 26 additions & 0 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,32 @@ func (p *azure) publishDevice(d *fetcher.Device, state *stateStore, inputID stri
_, _ = deviceDoc.Put("device.group", groups)
}

var owners []*fetcher.User
d.RegisteredOwners.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Warnf("Unable to lookup registered owner %q for device %q", userID, d.ID)
return
}
owners = append(owners, u)
})
if len(owners) != 0 {
_, _ = deviceDoc.Put("device.registered_owners", owners)
}

var users []*fetcher.User
d.RegisteredUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Warnf("Unable to lookup registered user %q for device %q", userID, d.ID)
return
}
users = append(users, u)
})
if len(users) != 0 {
_, _ = deviceDoc.Put("device.registered_users", users)
}

event := beat.Event{
Timestamp: time.Now(),
Fields: deviceDoc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,6 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

// TODO: Implement fetchers for the registered owners and users
// for devices. These will then be retained in the following fields
// of Device.
//
// // A set of UUIDs for registered owners of this device.
// RegisteredOwners collections.UUIDSet `json:"registeredOwners"`
// // A set of UUIDs for registered users of this device.
// RegisteredUsers collections.UUIDSet `json:"registeredUsers"`
//
// and the addition of the following lines to Merge
//
// other.RegisteredOwners.ForEach(func(elem uuid.UUID) {
// d.RegisteredOwners.Add(elem)
// })
// other.RegisteredUsers.ForEach(func(elem uuid.UUID) {
// d.RegisteredUsers.Add(elem)
// })
//
// with associated test extensions.

// Device represents a device identity asset.
type Device struct {
// The ID (UUIDv4) of the device.
Expand All @@ -41,6 +21,10 @@ type Device struct {
MemberOf collections.UUIDSet `json:"memberOf"`
// A set of UUIDs which are groups this device is a transitive member of.
TransitiveMemberOf collections.UUIDSet `json:"transitiveMemberOf"`
// A set of UUIDs for registered owners of this device.
RegisteredOwners collections.UUIDSet `json:"registeredOwners"`
// A set of UUIDs for registered users of this device.
RegisteredUsers collections.UUIDSet `json:"registeredUsers"`
// Discovered indicates that this device was newly discovered. This does not
// necessarily imply the device was recently added in Azure Active Directory,
// but it does indicate that it's the first time the device has been seen by
Expand Down Expand Up @@ -68,5 +52,11 @@ func (d *Device) Merge(other *Device) {
other.TransitiveMemberOf.ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
other.RegisteredOwners.ForEach(func(elem uuid.UUID) {
d.RegisteredOwners.Add(elem)
})
other.RegisteredUsers.ForEach(func(elem uuid.UUID) {
d.RegisteredUsers.Add(elem)
})
d.Deleted = other.Deleted
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func TestDevice_Merge(t *testing.T) {
},
MemberOf: collections.NewUUIDSet(uuid.MustParse("fcda226a-c920-4d99-81bc-d2d691a6c212")),
TransitiveMemberOf: collections.NewUUIDSet(uuid.MustParse("ca777ad5-9abf-4c9b-be1f-c38c6ec28f28")),
RegisteredOwners: collections.NewUUIDSet(uuid.MustParse("c59fbdb8-e442-46b1-8d72-c8ac0b78ec0a")),
RegisteredUsers: collections.NewUUIDSet(
uuid.MustParse("27cea005-7377-4175-b2ef-e9d64c977f4d"),
uuid.MustParse("c59fbdb8-e442-46b1-8d72-c8ac0b78ec0a"),
),
},
InOther: &Device{
ID: uuid.MustParse("187f924c-e867-477e-8d74-dd762d6379dd"),
Expand All @@ -40,6 +45,11 @@ func TestDevice_Merge(t *testing.T) {
},
MemberOf: collections.NewUUIDSet(uuid.MustParse("a77e8cbb-27a5-49d3-9d5e-801997621f87")),
TransitiveMemberOf: collections.NewUUIDSet(uuid.MustParse("c550d32c-09b2-4851-b0f2-1bc431e26d01")),
RegisteredOwners: collections.NewUUIDSet(uuid.MustParse("81d1b5cd-7cd6-469d-9fe8-0a5c6cf2a7b6")),
RegisteredUsers: collections.NewUUIDSet(
uuid.MustParse("5e6d279a-ce2b-43b8-a38f-3110907e1974"),
uuid.MustParse("c59fbdb8-e442-46b1-8d72-c8ac0b78ec0a"),
),
},
Want: &Device{
ID: uuid.MustParse("187f924c-e867-477e-8d74-dd762d6379dd"),
Expand All @@ -55,6 +65,15 @@ func TestDevice_Merge(t *testing.T) {
uuid.MustParse("ca777ad5-9abf-4c9b-be1f-c38c6ec28f28"),
uuid.MustParse("c550d32c-09b2-4851-b0f2-1bc431e26d01"),
),
RegisteredOwners: collections.NewUUIDSet(
uuid.MustParse("81d1b5cd-7cd6-469d-9fe8-0a5c6cf2a7b6"),
uuid.MustParse("c59fbdb8-e442-46b1-8d72-c8ac0b78ec0a"),
),
RegisteredUsers: collections.NewUUIDSet(
uuid.MustParse("27cea005-7377-4175-b2ef-e9d64c977f4d"),
uuid.MustParse("5e6d279a-ce2b-43b8-a38f-3110907e1974"),
uuid.MustParse("c59fbdb8-e442-46b1-8d72-c8ac0b78ec0a"),
),
},
},
}
Expand All @@ -70,6 +89,8 @@ func TestDevice_Merge(t *testing.T) {
require.Equal(t, tc.Want.Fields, tc.In.Fields)
require.ElementsMatch(t, tc.Want.MemberOf.Values(), tc.In.MemberOf.Values(), "list A: Expected, listB: Actual")
require.ElementsMatch(t, tc.Want.TransitiveMemberOf.Values(), tc.In.TransitiveMemberOf.Values(), "list A: Expected, listB: Actual")
require.ElementsMatch(t, tc.Want.RegisteredOwners.Values(), tc.In.RegisteredOwners.Values(), "list A: Expected, listB: Actual")
require.ElementsMatch(t, tc.Want.RegisteredUsers.Values(), tc.In.RegisteredUsers.Values(), "list A: Expected, listB: Actual")
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/google/uuid"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/collections"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/authenticator"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/fetcher"
"github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -109,9 +110,10 @@ type graph struct {
logger *logp.Logger
auth authenticator.Authenticator

usersURL string
groupsURL string
devicesURL string
usersURL string
groupsURL string
devicesURL string
deviceOwnerUserURL string
}

// SetLogger sets the logger on this fetcher.
Expand Down Expand Up @@ -155,12 +157,12 @@ func (f *graph) Groups(ctx context.Context, deltaLink string) ([]*fetcher.Group,
return groups, response.DeltaLink, nil
}
if response.NextLink == fetchURL {
return nil, "", fmt.Errorf("error during fetch groups, encountered nextLink fetch infinite loop")
return groups, "", nextLinkLoopError{"groups"}
}
if response.NextLink != "" {
fetchURL = response.NextLink
} else {
return nil, "", fmt.Errorf("error during fetch groups, encountered response without nextLink or deltaLink")
return groups, "", missingLinkError{"groups"}
}
}
}
Expand Down Expand Up @@ -207,12 +209,12 @@ func (f *graph) Users(ctx context.Context, deltaLink string) ([]*fetcher.User, s
return users, response.DeltaLink, nil
}
if response.NextLink == fetchURL {
return nil, "", fmt.Errorf("error during fetch users, encountered nextLink fetch infinite loop")
return users, "", nextLinkLoopError{"users"}
}
if response.NextLink != "" {
fetchURL = response.NextLink
} else {
return nil, "", fmt.Errorf("error during fetch users, encountered response without nextLink or deltaLink")
return users, "", missingLinkError{"users"}
}
}
}
Expand Down Expand Up @@ -252,23 +254,41 @@ func (f *graph) Devices(ctx context.Context, deltaLink string) ([]*fetcher.Devic
continue
}
f.logger.Debugf("Got device %q from API", device.ID)

f.addRegistered(ctx, device, "registeredOwners", &device.RegisteredOwners)
f.addRegistered(ctx, device, "registeredUsers", &device.RegisteredUsers)

devices = append(devices, device)
}

if response.DeltaLink != "" {
return devices, response.DeltaLink, nil
}
if response.NextLink == fetchURL {
return nil, "", fmt.Errorf("error during fetch devices, encountered nextLink fetch infinite loop")
return devices, "", nextLinkLoopError{"devices"}
}
if response.NextLink != "" {
fetchURL = response.NextLink
} else {
return nil, "", fmt.Errorf("error during fetch devices, encountered response without nextLink or deltaLink")
return devices, "", missingLinkError{"devices"}
}
}
}

// addRegistered adds registered owner or user UUIDs to the provided device.
func (f *graph) addRegistered(ctx context.Context, device *fetcher.Device, typ string, set *collections.UUIDSet) {
usersLink := fmt.Sprintf("%s/%s/%s", f.deviceOwnerUserURL, device.ID, typ)
users, _, err := f.Users(ctx, usersLink)
switch {
case err == nil, errors.Is(err, nextLinkLoopError{"users"}), errors.Is(err, missingLinkError{"users"}):
default:
f.logger.Errorf("Failed to obtain some registered user data: %w", err)
}
for _, u := range users {
set.Add(u.ID)
}
}

// doRequest is a convenience function for making HTTP requests to the Graph API.
// It will automatically handle requesting a token using the authenticator attached
// to this fetcher.
Expand Down Expand Up @@ -342,6 +362,15 @@ func New(cfg *config.C, logger *logp.Logger, auth authenticator.Authenticator) (
devicesURL.RawQuery = url.QueryEscape(defaultDevicesQuery)
f.devicesURL = devicesURL.String()

// The API takes a departure from the query approach here, so we
// need to construct a partial URL for use later when fetching
// registered owners and users.
ownerUserURL, err := url.Parse(f.conf.APIEndpoint + "/devices/")
if err != nil {
return nil, fmt.Errorf("invalid device owner/user URL endpoint: %w", err)
}
f.deviceOwnerUserURL = ownerUserURL.String()

return &f, nil
}

Expand Down Expand Up @@ -423,3 +452,19 @@ func newDeviceFromAPI(d deviceAPI) (*fetcher.Device, error) {

return &newDevice, nil
}

type nextLinkLoopError struct {
endpoint string
}

func (e nextLinkLoopError) Error() string {
return fmt.Sprintf("error during fetch %s, encountered nextLink fetch infinite loop", e.endpoint)
}

type missingLinkError struct {
endpoint string
}

func (e missingLinkError) Error() string {
return fmt.Sprintf("error during fetch %s, encountered response without nextLink or deltaLink", e.endpoint)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"path"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/collections"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/authenticator/mock"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/azuread/fetcher"
"github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -113,6 +117,24 @@ var devicesResponse2 = apiDeviceResponse{
},
}

var deviceOwnerResponses = map[string]apiUserResponse{
"6a59ea83-02bd-468f-a40b-f2c3d1821983": {
Users: []userAPI{{"id": "5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"}},
},
"adbbe40a-0627-4328-89f1-88cac84dbc7f": {
Users: []userAPI{{"id": "5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"}},
},
}

var deviceUserResponses = map[string]apiUserResponse{
"6a59ea83-02bd-468f-a40b-f2c3d1821983": {
Users: []userAPI{{"id": "d897d560-3d17-4dae-81b3-c898fe82bf84"}, {"id": "5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"}},
},
"adbbe40a-0627-4328-89f1-88cac84dbc7f": {
Users: []userAPI{{"id": "5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"}},
},
}

var groupsResponse1 = apiGroupResponse{
Groups: []groupAPI{
{
Expand Down Expand Up @@ -202,6 +224,26 @@ func (s *testServer) setup(t *testing.T) {
require.NoError(t, err)
})

mux.HandleFunc("/devices/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")

var data []byte
var err error

switch path.Base(r.URL.Path) {
case "registeredOwners":
data, err = json.Marshal(deviceOwnerResponses[path.Base(path.Dir(r.URL.Path))])
case "registeredUsers":
data, err = json.Marshal(deviceUserResponses[path.Base(path.Dir(r.URL.Path))])
default:
err = fmt.Errorf("unknown endpoint: %s", r.URL)
}
require.NoError(t, err)

_, err = w.Write(data)
require.NoError(t, err)
})

mux.HandleFunc("/groups/delta", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")

Expand Down Expand Up @@ -375,6 +417,13 @@ func TestGraph_Devices(t *testing.T) {
},
},
},
RegisteredOwners: collections.NewUUIDSet(
uuid.MustParse("5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"),
),
RegisteredUsers: collections.NewUUIDSet(
uuid.MustParse("5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"),
uuid.MustParse("d897d560-3d17-4dae-81b3-c898fe82bf84"),
),
},
{
ID: uuid.MustParse("adbbe40a-0627-4328-89f1-88cac84dbc7f"),
Expand All @@ -399,6 +448,12 @@ func TestGraph_Devices(t *testing.T) {
},
},
},
RegisteredOwners: collections.NewUUIDSet(
uuid.MustParse("5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"),
),
RegisteredUsers: collections.NewUUIDSet(
uuid.MustParse("5ebc6a0f-05b7-4f42-9c8a-682bbc75d0fc"),
),
},
}

Expand All @@ -417,6 +472,13 @@ func TestGraph_Devices(t *testing.T) {
gotDevices, gotDeltaLink, gotErr := f.Devices(ctx, "")

require.NoError(t, gotErr)
require.EqualValues(t, wantDevices, gotDevices)
// Using go-cmp because testify is too weak for this comparison.
// reflect.DeepEqual works, but won't show a reasonable diff.
exporter := cmp.Exporter(func(t reflect.Type) bool {
return t == reflect.TypeOf(collections.UUIDSet{})
})
if !cmp.Equal(wantDevices, gotDevices, exporter) {
t.Errorf("unexpected result:\n--- got\n--- want\n%s", cmp.Diff(wantDevices, gotDevices, exporter))
}
require.Equal(t, wantDeltaLink, gotDeltaLink)
}

0 comments on commit 4d7445b

Please sign in to comment.