From f01adb125a9a064eda8c167af33698933b5fe5f0 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 12 Jul 2023 10:42:16 +0930 Subject: [PATCH] x-pack/filebeat/input/entityanalytics/provider/okta: add device handling WIP: needs validation against real API. --- CHANGELOG.next.asciidoc | 1 + .../inputs/input-entity-analytics.asciidoc | 145 ++++++++++-- .../entityanalytics/provider/okta/conf.go | 5 + .../entityanalytics/provider/okta/okta.go | 213 ++++++++++++++++-- .../provider/okta/okta_test.go | 108 ++++++--- .../provider/okta/statestore.go | 87 +++++-- .../provider/okta/statestore_test.go | 40 +++- 7 files changed, 520 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eced679bbd5..873e853424f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -233,6 +233,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add support for registered owners and users to AzureAD entity analytics provider. {pull}36092[36092] - Added support for Okta OAuth2 provider in the httpjson input. {pull}36273[36273] - Add support of the interval parameter in Salesforce setupaudittrail-rest fileset. {issue}35917[35917] {pull}35938[35938] +- Add device handling to Okta input package for entity analytics. {pull}36049[36049] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc index c983b607e76..8664dd23fa4 100644 --- a/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-entity-analytics.asciidoc @@ -308,8 +308,8 @@ Override the default authentication scopes. Only change if directed to do so. [id="provider-okta"] ==== Okta User Identities (`okta`) -The `okta` provider allows the input to retrieve users from the Okta user -API. +The `okta` provider allows the input to retrieve users and devices from the +Okta user API. [float] ==== Setup @@ -322,6 +322,8 @@ this to configure the provider. Note that the token will not be presented again, so it must be copied now. This value will use given to the provider via the `okta_token` configuration field. +Devices API access needs to be activated by Okta support. + [float] ==== How It Works @@ -329,23 +331,25 @@ via the `okta_token` configuration field. ===== Overview The Okta provider periodically contacts the Okta API, retrieving updates for -users, updates its internal cache of user metadata, and ships updated user -metadata to Elasticsearch. +users and devices, updates its internal cache of user metadata, and ships +updated user/device metadata to Elasticsearch. Fetching and shipping updates occurs in one of two processes: **full -synchronizations** and *incremental updates*. Full synchronizations will send the -entire list of users in state, along with write markers to indicate the start -and end of the synchronization event. Incremental updates will only send data -for changed users during that event. Changes on a user can come in many forms, -whether it be a change to the user's metadata, or a user was added or deleted. +synchronizations** and *incremental updates*. Full synchronizations will send +the entire list of users and devices in state, along with write markers to +indicate the start and end of the synchronization event. Incremental updates +will only send data for changed users and devices during that event. Changes +on a user or device can come in many forms, whether it be a change to the +user's metadata, or a user was added or deleted. [float] ===== API Interactions -The provider periodically retrieves changes to user metadata from the -Okta User API. This is done through calls to: +The provider periodically retrieves changes to user/device metadata from the +Okta User and Device APIs. This is done through calls to: - https://developer.okta.com/docs/reference/api/users/#list-users[/api/v1/users] +- https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Device/#tag/Device/operation/listDevices[/api/v1/devices] Updates are tracked by the provider by retaining a record of the time of the last noted update in the returned user list. During provider updates the Okta provider @@ -355,10 +359,10 @@ since the provider's recorded last update. [float] ===== Sending User Metadata to Elasticsearch -During a full synchronization, all users stored in state will be sent to the -output, while incremental updates will only send users which have been updated. -Full synchronizations will be bounded on either side by write marker documents, -which will look something like this: +During a full synchronization, all users/devices stored in state will be sent +to the output, while incremental updates will only send users and devices +that have been updated. Full synchronizations will be bounded on either side +by write marker documents, which will look something like this: ["source","json",subs="attributes"] ---- @@ -425,6 +429,110 @@ Example user document: } ---- +Device documents will show the current state of the device, including any +associated users. + +Example device document: + +["source","json",subs="attributes"] +---- +{ + "@timestamp": "2023-07-04T09:57:19.786056-05:00", + "event": { + "action": "device-discovered", + }, + "okta": { + "created": "2019-10-02T18:03:07Z", + "id": "deviceid", + "lastUpdated": "2019-10-02T18:03:07Z", + "profile": { + "diskEncryptionType": "ALL_INTERNAL_VOLUMES", + "displayName": "Example Device name 1", + "platform": "WINDOWS", + "registered": true, + "secureHardwarePresent": false, + "serialNumber": "XXDDRFCFRGF3M8MD6D", + "sid": "S-1-11-111" + }, + "resourceAlternateID": "", + "resourceDisplayName": { + "sensitive": false, + "value": "Example Device name 1" + }, + "resourceID": "deviceid", + "resourceType": "UDDevice", + "status": "ACTIVE", + "_links": { + "activate": { + "hints": { + "allow": [ + "POST" + ] + }, + "href": "https://localhost/api/v1/devices/deviceid/lifecycle/activate" + }, + "self": { + "hints": { + "allow": [ + "GET", + "PATCH", + "PUT" + ] + }, + "href": "https://localhost/api/v1/devices/deviceid" + }, + "users": { + "hints": { + "allow": [ + "GET" + ] + }, + "href": "https://localhost/api/v1/devices/deviceid/users" + } + }, + "users": [ + { + "id": "userid", + "status": "RECOVERY", + "created": "2023-05-14T13:37:20Z", + "activated": "0001-01-01T00:00:00Z", + "statusChanged": "2023-05-15T01:50:30Z", + "lastLogin": "2023-05-15T01:59:20Z", + "lastUpdated": "2023-05-15T01:50:32Z", + "passwordChanged": "2023-05-15T01:50:32Z", + "type": { + "id": "typeid" + }, + "profile": { + "login": "name.surname@example.com", + "email": "name.surname@example.com", + "firstName": "name", + "lastName": "surname" + }, + "credentials": { + "password": {}, + "provider": { + "type": "OKTA", + "name": "OKTA" + } + }, + "_links": { + "self": { + "href": "https://localhost/api/v1/users/userid" + } + } + } + ] + }, + "device": { + "id": "deviceid", + }, + "labels": { + "identity_source": "okta-1" + } +} +---- + [float] ==== Configuration @@ -455,6 +563,13 @@ The Okta domain. Field is required. The Okta secret token, used for authentication. Field is required. +[float] +===== `collect_device_details` + +Whether the input should collect device and device-associated user details +from the Okta API. Device details must be activated on the Okta account for +this option. + [float] ===== `sync_interval` diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go index 871a48d9cbd..44cac308075 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/conf.go @@ -41,6 +41,11 @@ type conf struct { OktaDomain string `config:"okta_domain" validate:"required"` OktaToken string `config:"okta_token" validate:"required"` + // WantDevices indicates that device details + // should be collected. This is optional as + // the devices API is not necessarily activated. + WantDevices bool `config:"collect_device_details"` + // SyncInterval is the time between full // synchronisation operations. SyncInterval time.Duration `config:"sync_interval"` diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go index fe6a69890fe..2aeb57e2f6b 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go @@ -243,12 +243,16 @@ func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien ctx := ctxtool.FromCanceller(inputCtx.Cancelation) p.logger.Debugf("Starting fetch...") - _, err = p.doFetch(ctx, state, true) + _, err = p.doFetchUsers(ctx, state, true) + if err != nil { + return err + } + _, err = p.doFetchDevices(ctx, state, true) if err != nil { return err } - if len(state.users) != 0 { + if len(state.users) != 0 || len(state.devices) != 0 { tracker := kvstore.NewTxTracker(ctx) start := time.Now() @@ -256,6 +260,9 @@ func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien for _, u := range state.users { p.publishUser(u, state, inputCtx.ID, client, tracker) } + for _, d := range state.devices { + p.publishDevice(d, state, inputCtx.ID, client, tracker) + } end := time.Now() p.publishMarker(end, end, inputCtx.ID, false, client, tracker) @@ -294,17 +301,24 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto }() ctx := ctxtool.FromCanceller(inputCtx.Cancelation) - updatedUsers, err := p.doFetch(ctx, state, false) + updatedUsers, err := p.doFetchUsers(ctx, state, false) + if err != nil { + return err + } + updatedDevices, err := p.doFetchDevices(ctx, state, false) if err != nil { return err } var tracker *kvstore.TxTracker - if len(updatedUsers) != 0 { + if len(updatedUsers) != 0 || len(updatedDevices) != 0 { tracker = kvstore.NewTxTracker(ctx) for _, u := range updatedUsers { p.publishUser(u, state, inputCtx.ID, client, tracker) } + for _, d := range updatedDevices { + p.publishDevice(d, state, inputCtx.ID, client, tracker) + } tracker.Wait() } @@ -320,19 +334,18 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto return nil } -// doFetch handles fetching user and group identities from Azure Active Directory -// and enriching users with group memberships. If fullSync is true, then any -// existing deltaLink will be ignored, forcing a full synchronization from -// Azure Active Directory. Returns a set of modified users by ID. -func (p *oktaInput) doFetch(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) { +// doFetchUsers handles fetching user identities from Okta. If fullSync is true, then +// any existing deltaLink will be ignored, forcing a full synchronization from Okta. +// Returns a set of modified users by ID. +func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) { var ( query url.Values err error ) // Get user changes. - if !fullSync && state.next != "" { - query, err = url.ParseQuery(state.next) + if !fullSync && state.nextUsers != "" { + query, err = url.ParseQuery(state.nextUsers) if err != nil { p.logger.Warnf("failed to parse next query: %v", err) } @@ -394,18 +407,155 @@ func (p *oktaInput) doFetch(ctx context.Context, state *stateStore, fullSync boo // have a complete set from that timestamp. query = url.Values{} query.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601))) - state.next = query.Encode() + state.nextUsers = query.Encode() p.logger.Debugf("received %d users from API", len(users)) return users, nil } -func grow(u []*User, n int) []*User { - if len(u)+n <= cap(u) { - return u +// doFetchDevices handles fetching device and associated user identities from Okta. +// If fullSync is true, then any existing deltaLink will be ignored, forcing a full +// synchronization from Okta. +// Returns a set of modified devices by ID. +func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool) ([]*Device, error) { + if !p.cfg.WantDevices { + return nil, nil + } + + var ( + deviceQuery url.Values + userQueryInit url.Values + err error + ) + + // Get user changes. + if !fullSync && state.nextDevices != "" { + deviceQuery, err = url.ParseQuery(state.nextDevices) + if err != nil { + p.logger.Warnf("failed to parse next query: %v", err) + } + } + if deviceQuery == nil { + // Use "search" because of recommendation on Okta dev documentation: + // https://developer.okta.com/docs/reference/user-query/. + // Search term of "status pr" is required so that we get DEPROVISIONED + // users; a nil query is more efficient, but excludes these users. + // There is no equivalent documentation for devices, so we assume the + // behaviour is the same. + deviceQuery = url.Values{"search": []string{"status pr"}} + } + // Start user queries from the same time point. This must not + // be mutated since we may perform multiple batched gets over + // multiple devices. + userQueryInit = cloneURLValues(deviceQuery) + + var ( + devices []*Device + lastUpdated time.Time + ) + for { + batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.cfg.LimitWindow) + if err != nil { + p.logger.Debugf("received %d devices from API", len(devices)) + return nil, err + } + p.logger.Debugf("received batch of %d devices from API", len(batch)) + + for i, d := range batch { + userQuery := cloneURLValues(userQueryInit) + for { + // TODO: Consider softening the response to errors here. If we fail to get users + // from a device, do we want to fail completely? There are arguments in both + // directions. We _could_ keep a multierror and return that in the end, which + // would guarantee progression, but may result in holes in the data. What we are + // doing at the moment (both here and in doFetchUsers) guarantees no holes, but + // at the cost of potentially not making progress. + + const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus + + users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.cfg.LimitWindow) + if err != nil { + p.logger.Debugf("received %d device users from API", len(users)) + return nil, err + } + p.logger.Debugf("received batch of %d device users from API", len(users)) + + // Users are not stored in the state as they are in doFetchUsers. We expect + // them to already have been discovered/stored from that call and are stored + // associated with the device undecorated with discovery state. + batch[i].Users = append(batch[i].Users, users...) + + next, err := okta.Next(h) + if err != nil { + if err == io.EOF { + break + } + p.logger.Debugf("received %d devices from API", len(devices)) + return devices, err + } + userQuery = next + } + } + + if fullSync { + for _, d := range batch { + state.storeDevice(d) + if d.LastUpdated.After(lastUpdated) { + lastUpdated = d.LastUpdated + } + } + } else { + devices = grow(devices, len(batch)) + for _, d := range batch { + devices = append(devices, state.storeDevice(d)) + if d.LastUpdated.After(lastUpdated) { + lastUpdated = d.LastUpdated + } + } + } + + next, err := okta.Next(h) + if err != nil { + if err == io.EOF { + break + } + p.logger.Debugf("received %d devices from API", len(devices)) + return devices, err + } + deviceQuery = next + } + + // Prepare query for next update. This is any record that was updated + // at or after the last updated record we saw this round. Use this rather + // than time.Now() since we may have received stale records. Use ge + // rather than gt since timestamps are second resolution, so we may not + // have a complete set from that timestamp. + deviceQuery = url.Values{} + deviceQuery.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601))) + state.nextDevices = deviceQuery.Encode() + + p.logger.Debugf("received %d devices from API", len(devices)) + return devices, nil +} + +func cloneURLValues(a url.Values) url.Values { + b := make(url.Values, len(a)) + for k, v := range a { + b[k] = append(v[:0:0], v...) } - new := append(u, make([]*User, n)...) - return new[:len(u)] + return b +} + +type entity interface { + *User | *Device | okta.User +} + +func grow[T entity](e []T, n int) []T { + if len(e)+n <= cap(e) { + return e + } + new := append(e, make([]T, n)...) + return new[:len(e)] } // publishMarker will publish a write marker document using the given beat.Client. @@ -465,3 +615,32 @@ func (p *oktaInput) publishUser(u *User, state *stateStore, inputID string, clie client.Publish(event) } + +// publishDevice will publish a device document using the given beat.Client. +func (p *oktaInput) publishDevice(d *Device, state *stateStore, inputID string, client beat.Client, tracker *kvstore.TxTracker) { + devDoc := mapstr.M{} + + _, _ = devDoc.Put("okta", d.Device) + _, _ = devDoc.Put("labels.identity_source", inputID) + _, _ = devDoc.Put("device.id", d.ID) + + switch d.State { + case Deleted: + _, _ = devDoc.Put("event.action", "device-deleted") + case Discovered: + _, _ = devDoc.Put("event.action", "device-discovered") + case Modified: + _, _ = devDoc.Put("event.action", "device-modified") + } + + event := beat.Event{ + Timestamp: time.Now(), + Fields: devDoc, + Private: tracker, + } + tracker.Add() + + p.logger.Debugf("Publishing device %q", d.ID) + + client.Publish(event) +} diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go index 20df104c06a..d10b81061c9 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go @@ -11,6 +11,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "path" "strings" "testing" "time" @@ -28,18 +29,29 @@ func TestOktaDoFetch(t *testing.T) { }) const ( - window = time.Minute - key = "token" - msg = `[{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}]` + window = time.Minute + key = "token" + users = `[{"id":"USERID","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"name.surname@example.com","email":"name.surname@example.com"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"name.surname@example.com","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/USERID"}}}]` + devices = `[{"id":"DEVICEID","status":"STATUS","created":"2019-10-02T18:03:07.000Z","lastUpdated":"2019-10-02T18:03:07.000Z","profile":{"displayName":"Example Device name 1","platform":"WINDOWS","serialNumber":"XXDDRFCFRGF3M8MD6D","sid":"S-1-11-111","registered":true,"secureHardwarePresent":false,"diskEncryptionType":"ALL_INTERNAL_VOLUMES"},"resourceType":"UDDevice","resourceDisplayName":{"value":"Example Device name 1","sensitive":false},"resourceAlternateId":null,"resourceId":"DEVICEID","_links":{"activate":{"href":"https://localhost/api/v1/devices/DEVICEID/lifecycle/activate","hints":{"allow":["POST"]}},"self":{"href":"https://localhost/api/v1/devices/DEVICEID","hints":{"allow":["GET","PATCH","PUT"]}},"users":{"href":"https://localhost/api/v1/devices/DEVICEID/users","hints":{"allow":["GET"]}}}}]` ) + data := map[string]string{ + "users": users, + "devices": devices, + } + var wantUsers []User - err := json.Unmarshal([]byte(msg), &wantUsers) + err := json.Unmarshal([]byte(users), &wantUsers) if err != nil { t.Fatalf("failed to unmarshal user data: %v", err) } + var wantDevices []Device + err = json.Unmarshal([]byte(users), &wantDevices) + if err != nil { + t.Fatalf("failed to unmarshal device data: %v", err) + } - wantUserStates := make(map[string]State) + wantStates := make(map[string]State) // Set the number of repeats. const repeats = 3 @@ -50,13 +62,22 @@ func TestOktaDoFetch(t *testing.T) { w.Header().Add("x-rate-limit-remaining", "49") w.Header().Add("x-rate-limit-reset", fmt.Sprint(time.Now().Add(time.Minute).Unix())) + if strings.HasPrefix(r.URL.Path, "/api/v1/device") && strings.HasSuffix(r.URL.Path, "users") { + // Give one user if this is a get device users request. + fmt.Fprintln(w, data["users"]) + return + } + + base := path.Base(r.URL.Path) + // Set next link if we can still repeat. n++ if n < repeats { - w.Header().Add("link", `; rel="next"`) + w.Header().Add("link", fmt.Sprintf(`; rel="next"`, base)) } - userid := fmt.Sprintf("userid%d", n) + prefix := strings.TrimRight(base, "s") // endpoints are plural. + id := fmt.Sprintf("%sid%d", prefix, n) // Store expected states. The State values are all Discovered // unless the user is deleted since they are all first appearance. @@ -70,13 +91,13 @@ func TestOktaDoFetch(t *testing.T) { if status == "DEPROVISIONED" { state = Deleted } - wantUserStates[userid] = state + wantStates[id] = state replacer := strings.NewReplacer( - "userid", userid, + strings.ToUpper(prefix+"id"), id, "STATUS", status, ) - fmt.Fprintln(w, replacer.Replace(msg)) + fmt.Fprintln(w, replacer.Replace(data[base])) })) defer ts.Close() @@ -86,8 +107,9 @@ func TestOktaDoFetch(t *testing.T) { } a := oktaInput{ cfg: conf{ - OktaDomain: u.Host, - OktaToken: key, + OktaDomain: u.Host, + OktaToken: key, + WantDevices: true, }, client: ts.Client(), lim: rate.NewLimiter(1, 1), @@ -100,22 +122,56 @@ func TestOktaDoFetch(t *testing.T) { } defer ss.close(false) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - got, err := a.doFetch(ctx, ss, false) - if err != nil { - t.Fatalf("unexpected error from doFetch: %v", err) - } - if len(got) != repeats { - t.Errorf("unexpected number of results: got:%d want:%d", len(got), repeats) - } - for i, g := range got { - if wantID := fmt.Sprintf("userid%d", i+1); g.ID != wantID { - t.Errorf("unexpected user ID for user %d: got:%s want:%s", i, g.ID, wantID) + t.Run("users", func(t *testing.T) { + n = 0 + + got, err := a.doFetchUsers(ctx, ss, false) + if err != nil { + t.Fatalf("unexpected error from doFetch: %v", err) + } + + if len(got) != repeats { + t.Errorf("unexpected number of results: got:%d want:%d", len(got), repeats) } - if g.State != wantUserStates[g.ID] { - t.Errorf("unexpected user ID for user %s: got:%s want:%s", g.ID, g.State, wantUserStates[g.ID]) + for i, g := range got { + if wantID := fmt.Sprintf("userid%d", i+1); g.ID != wantID { + t.Errorf("unexpected user ID for user %d: got:%s want:%s", i, g.ID, wantID) + } + if g.State != wantStates[g.ID] { + t.Errorf("unexpected user ID for user %s: got:%s want:%s", g.ID, g.State, wantStates[g.ID]) + } } - } + }) + + t.Run("devices", func(t *testing.T) { + n = 0 + + got, err := a.doFetchDevices(ctx, ss, false) + if err != nil { + t.Fatalf("unexpected error from doFetch: %v", err) + } + + if len(got) != repeats { + t.Errorf("unexpected number of results: got:%d want:%d", len(got), repeats) + } + for i, g := range got { + if wantID := fmt.Sprintf("deviceid%d", i+1); g.ID != wantID { + t.Errorf("unexpected device ID for device %d: got:%s want:%s", i, g.ID, wantID) + } + if g.State != wantStates[g.ID] { + t.Errorf("unexpected device ID for device %s: got:%s want:%s", g.ID, g.State, wantStates[g.ID]) + } + if g.Users == nil { + t.Errorf("expected users for device %s", g.ID) + } + } + + if t.Failed() { + b, _ := json.MarshalIndent(got, "", "\t") + t.Logf("document:\n%s", b) + } + }) } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/statestore.go b/x-pack/filebeat/input/entityanalytics/provider/okta/statestore.go index 2b616a2a337..8a11376af51 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/statestore.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/statestore.go @@ -17,12 +17,14 @@ import ( ) var ( - usersBucket = []byte("users") - stateBucket = []byte("state") + usersBucket = []byte("users") + devicesBucket = []byte("devices") + stateBucket = []byte("state") - lastSyncKey = []byte("last_sync") - lastUpdateKey = []byte("last_update") - usersLinkKey = []byte("users_link") + lastSyncKey = []byte("last_sync") + lastUpdateKey = []byte("last_update") + usersLinkKey = []byte("users_link") + devicesLinkKey = []byte("devices_link") ) //go:generate stringer -type State @@ -40,19 +42,28 @@ type User struct { State State `json:"state"` } +type Device struct { + okta.Device `json:"properties"` + State State `json:"state"` +} + // stateStore wraps a kvstore.Transaction and provides convenience methods for // accessing and store relevant data within the kvstore database. type stateStore struct { tx *kvstore.Transaction - // next is a url.Values stored as a string to make - // use of the direct serialisation/deserialisation + // nextUsers and nextDevices are url.Values stored as a string + // to make use of the direct serialisation/deserialisation // rather than encoding/json. - next string + nextUsers string + nextDevices string + // lastSync and lastUpdate are the times of the first update + // or sync operation of users/devices. lastSync time.Time lastUpdate time.Time users map[string]*User + devices map[string]*Device } // newStateStore creates a new instance of stateStore. It will open a new write @@ -67,8 +78,9 @@ func newStateStore(store *kvstore.Store) (*stateStore, error) { } s := stateStore{ - users: make(map[string]*User), - tx: tx, + users: make(map[string]*User), + devices: make(map[string]*Device), + tx: tx, } err = s.tx.Get(stateBucket, lastSyncKey, &s.lastSync) @@ -79,10 +91,14 @@ func newStateStore(store *kvstore.Store) (*stateStore, error) { if err != nil && !errIsItemNotFound(err) { return nil, fmt.Errorf("unable to get last update time from state: %w", err) } - err = s.tx.Get(stateBucket, usersLinkKey, &s.next) + err = s.tx.Get(stateBucket, usersLinkKey, &s.nextUsers) if err != nil && !errIsItemNotFound(err) { return nil, fmt.Errorf("unable to get users link from state: %w", err) } + err = s.tx.Get(stateBucket, devicesLinkKey, &s.nextDevices) + if err != nil && !errIsItemNotFound(err) { + return nil, fmt.Errorf("unable to get devices link from state: %w", err) + } err = s.tx.ForEach(usersBucket, func(key, value []byte) error { var u User @@ -98,6 +114,20 @@ func newStateStore(store *kvstore.Store) (*stateStore, error) { return nil, fmt.Errorf("unable to get users from state: %w", err) } + err = s.tx.ForEach(devicesBucket, func(key, value []byte) error { + var d Device + err = json.Unmarshal(value, &d) + if err != nil { + return fmt.Errorf("unable to unmarshal device from state: %w", err) + } + s.devices[d.ID] = &d + + return nil + }) + if err != nil && !errIsItemNotFound(err) { + return nil, fmt.Errorf("unable to get devices from state: %w", err) + } + return &s, nil } @@ -120,6 +150,25 @@ func (s *stateStore) storeUser(u okta.User) *User { return &su } +// storeDevice stores a device. If the device does not exist in the store, then the +// device will be marked as discovered. Otherwise, the user will be marked +// as modified. +func (s *stateStore) storeDevice(d okta.Device) *Device { + du := Device{Device: d} + if d.Status == "DEPROVISIONED" { + du.State = Deleted + return &du + } + if existing, ok := s.devices[d.ID]; ok { + du.State = Modified + *existing = du + } else { + du.State = Discovered + s.devices[d.ID] = &du + } + return &du +} + // close will close out the stateStore. If commit is true, the staged values on the // stateStore will be set in the kvstore database, and the transaction will be // committed. Otherwise, all changes will be discarded and the transaction will @@ -156,12 +205,18 @@ func (s *stateStore) close(commit bool) (err error) { return fmt.Errorf("unable to save last update time to state: %w", err) } } - if s.next != "" { - err = s.tx.Set(stateBucket, usersLinkKey, &s.next) + if s.nextUsers != "" { + err = s.tx.Set(stateBucket, usersLinkKey, &s.nextUsers) if err != nil { return fmt.Errorf("unable to save users link to state: %w", err) } } + if s.nextDevices != "" { + err = s.tx.Set(stateBucket, devicesLinkKey, &s.nextDevices) + if err != nil { + return fmt.Errorf("unable to save devices link to state: %w", err) + } + } for key, value := range s.users { err = s.tx.Set(usersBucket, []byte(key), value) @@ -169,6 +224,12 @@ func (s *stateStore) close(commit bool) (err error) { return fmt.Errorf("unable to save user %q to state: %w", key, err) } } + for key, value := range s.devices { + err = s.tx.Set(devicesBucket, []byte(key), value) + if err != nil { + return fmt.Errorf("unable to save device %q to state: %w", key, err) + } + } return s.tx.Commit() } diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/statestore_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/statestore_test.go index caac6bd383d..e43128a9346 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/okta/statestore_test.go +++ b/x-pack/filebeat/input/entityanalytics/provider/okta/statestore_test.go @@ -28,7 +28,10 @@ func TestStateStore(t *testing.T) { if err != nil { t.Fatalf("failed to parse lastUpdate") } - const usersLink = "users-link" + const ( + usersLink = "users-link" + devicesLink = "devices-link" + ) t.Run("new", func(t *testing.T) { dbFilename := "TestStateStore_New.db" @@ -45,6 +48,7 @@ func TestStateStore(t *testing.T) { {key: lastSyncKey, val: lastSync}, {key: lastUpdateKey, val: lastUpdate}, {key: usersLinkKey, val: usersLink}, + {key: devicesLinkKey, val: devicesLink}, } for _, kv := range data { err := store.RunTransaction(true, func(tx *kvstore.Transaction) error { @@ -67,7 +71,8 @@ func TestStateStore(t *testing.T) { }{ {name: "lastSync", got: ss.lastSync, want: lastSync}, {name: "lastUpdate", got: ss.lastUpdate, want: lastUpdate}, - {name: "usersLink", got: ss.next, want: usersLink}, + {name: "usersLink", got: ss.nextUsers, want: usersLink}, + {name: "devicesLink", got: ss.nextDevices, want: devicesLink}, } for _, c := range checks { if !cmp.Equal(c.got, c.want) { @@ -83,7 +88,7 @@ func TestStateStore(t *testing.T) { testCleanupStore(store, dbFilename) }) - want := map[string]*User{ + wantUsers := map[string]*User{ "userid": { State: Discovered, User: okta.User{ @@ -118,6 +123,22 @@ func TestStateStore(t *testing.T) { }, }, } + wantDevices := map[string]*Device{ + "deviceid": { + State: Discovered, + Device: okta.Device{ + ID: "deviceid", + Status: "STATUS", + Created: time.Now(), + LastUpdated: time.Now(), + Links: okta.HAL{ + "self": map[string]interface{}{ + "href": "https://localhost/api/v1/devices/deviceid", + }, + }, + }, + }, + } ss, err := newStateStore(store) if err != nil { @@ -125,8 +146,10 @@ func TestStateStore(t *testing.T) { } ss.lastSync = lastSync ss.lastUpdate = lastUpdate - ss.next = usersLink - ss.users = want + ss.nextUsers = usersLink + ss.nextDevices = devicesLink + ss.users = wantUsers + ss.devices = wantDevices err = ss.close(true) if err != nil { @@ -140,7 +163,8 @@ func TestStateStore(t *testing.T) { }{ {name: "lastSyncKey", key: lastSyncKey, val: &ss.lastSync}, {name: "lastUpdateKey", key: lastUpdateKey, val: &ss.lastUpdate}, - {name: "usersLinkKey", key: usersLinkKey, val: &ss.next}, + {name: "usersLinkKey", key: usersLinkKey, val: &ss.nextUsers}, + {name: "devicesLinkKey", key: devicesLinkKey, val: &ss.nextDevices}, } for _, check := range roundTripChecks { want, err := json.Marshal(check.val) @@ -175,8 +199,8 @@ func TestStateStore(t *testing.T) { if err != nil { t.Errorf("unexpected error from store run transaction: %v", err) } - if !cmp.Equal(want, users) { - t.Errorf("unexpected result:\n- want\n+ got\n%s", cmp.Diff(want, users)) + if !cmp.Equal(wantUsers, users) { + t.Errorf("unexpected result:\n- want\n+ got\n%s", cmp.Diff(wantUsers, users)) } })