Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.../input/entityanalytics/provider/okta: Handle 429s, concurrent limits #42094

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,55 +374,73 @@
// See GetUserDetails for details of the query and rate limit parameters.
func getDetails[E entity](ctx context.Context, cli *http.Client, u *url.URL, endpoint string, key string, all bool, omit Response, lim *RateLimiter, log *logp.Logger) ([]E, http.Header, error) {
url := u.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, nil, err
}
req.Header.Set("Accept", "application/json")
contentType := "application/json"
if omit != OmitNone {
contentType += "; " + omit.String()
}
req.Header.Set("Content-Type", contentType)
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key))
retryCount := 0
const maxRetries = 5

err = lim.Wait(ctx, endpoint, u, log)
if err != nil {
return nil, nil, err
}
resp, err := cli.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
err = lim.Update(endpoint, resp.Header, log)
if err != nil {
io.Copy(io.Discard, resp.Body)
return nil, nil, err
}
for {
if retryCount > maxRetries {
return nil, nil, fmt.Errorf("maximum retries (%d) finished without success", maxRetries)
}
if retryCount > 0 {
log.Warnw("retrying...", "retry", retryCount, "max", maxRetries)
}

var body bytes.Buffer
n, err := io.Copy(&body, resp.Body)
if n == 0 || err != nil {
return nil, nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, nil, err
}
req.Header.Set("Accept", "application/json")
contentType := "application/json"
if omit != OmitNone {
contentType += "; " + omit.String()
}
req.Header.Set("Content-Type", contentType)
req.Header.Set("Authorization", fmt.Sprintf("SSWS %s", key))

if all {
// List all entities.
var e []E
err = json.Unmarshal(body.Bytes(), &e)
err = lim.Wait(ctx, endpoint, u, log)
if err != nil {
return nil, nil, err
}
resp, err := cli.Do(req)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
err = lim.Update(endpoint, resp.Header, log)
if err != nil {
io.Copy(io.Discard, resp.Body)

Check failure on line 411 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `io.Copy` is not checked (errcheck)

Check failure on line 411 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `io.Copy` is not checked (errcheck)
return nil, nil, err
}

if resp.StatusCode == http.StatusTooManyRequests {
log.Warnw("received 429 Too Many Requests")
retryCount++
continue
}

var body bytes.Buffer
n, err := io.Copy(&body, resp.Body)
if n == 0 || err != nil {
return nil, nil, err
}

if all {
// List all entities.
var e []E
err = json.Unmarshal(body.Bytes(), &e)
if err != nil {
err = recoverError(body.Bytes())
}
return e, resp.Header, err
}
// Get single entity's details.
var e [1]E
err = json.Unmarshal(body.Bytes(), &e[0])
if err != nil {
err = recoverError(body.Bytes())
}
return e, resp.Header, err
}
// Get single entity's details.
var e [1]E
err = json.Unmarshal(body.Bytes(), &e[0])
if err != nil {
err = recoverError(body.Bytes())
return e[:], resp.Header, err
}
return e[:], resp.Header, err
}

// recoverError returns an error based on the returned Okta API error. Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@
func TestNext(t *testing.T) {
for i, test := range nextTests {
got, err := Next(test.header)
if err != test.wantErr {

Check failure on line 449 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 449 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Errorf("unexpected ok result for %d: got:%v want:%v", i, err, test.wantErr)
}
if got.Encode() != test.want {
Expand All @@ -454,3 +454,83 @@
}
}
}

func TestRateLimitRetries(t *testing.T) {
logp.TestingSetup()
logger := logp.L()

t.Run("retries", func(t *testing.T) {
const window = time.Minute
var fixedLimit *int = nil
limiter := NewRateLimiter(window, fixedLimit)

const key = "token"
msg := `[{"id":"userid","status":"STATUS","created":"2023-05-14T13:37:20.000Z","activated":null,"statusChanged":"2023-05-15T01:50:30.000Z","lastLogin":"2023-05-15T01:59:20.000Z","lastUpdated":"2023-05-15T01:50:32.000Z","passwordChanged":"2023-05-15T01:50:32.000Z","recovery_question":{"question":"Who's a major player in the cowboy scene?","answer":"Annie Oakley"},"type":{"id":"typeid"},"profile":{"firstName":"name","lastName":"surname","mobilePhone":null,"secondEmail":null,"login":"[email protected]","email":"[email protected]"},"credentials":{"password":{"value":"secret"},"emails":[{"value":"[email protected]","status":"VERIFIED","type":"PRIMARY"}],"provider":{"type":"OKTA","name":"OKTA"}},"_links":{"self":{"href":"https://localhost/api/v1/users/userid"}}}]`
want, err := mkWant[User](msg)
if err != nil {
t.Fatalf("failed to unmarshal entity data: %v", err)
}

responseNum := 0
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
responseNum++

u, err := url.Parse(r.RequestURI)
if err != nil {
t.Errorf("unexpected error parsing request URI: %v", err)
}
endpoint := "/api/v1/users"
if u.Path != endpoint {
t.Errorf("unexpected API endpoint: got:%s want:%s", u.Path, endpoint)
}
if got := r.Header.Get("accept"); got != "application/json" {
t.Errorf("unexpected Accept header: got:%s want:%s", got, "application/json")
}
if got := r.Header.Get("authorization"); got != "SSWS "+key {
t.Errorf("unexpected Authorization header: got:%s want:%s", got, "SSWS "+key)
}

w.Header().Add("x-rate-limit-limit", "1000000") // Let requests come fast
w.Header().Add("x-rate-limit-reset", fmt.Sprint(time.Now().Unix()))

if responseNum == 3 || responseNum == 3+7 {
// Respond with data
w.Header().Add("x-rate-limit-remaining", "49")
fmt.Fprintln(w, msg)
} else {
// Respond with a 429 error
w.Header().Add("x-rate-limit-remaining", "0")
http.Error(w, "[]", http.StatusTooManyRequests) // We don't know what body is returned with 429s
}
}))
defer ts.Close()
u, err := url.Parse(ts.URL)
if err != nil {
t.Errorf("failed to parse server URL: %v", err)
}
host := u.Host

// retry until there's a non-429 response
query := make(url.Values)
query.Set("limit", "200")
got, _, err := GetUserDetails(context.Background(), ts.Client(), host, key, "", query, OmitNone, limiter, logger)
if err != nil {
t.Fatalf("unexpected error from Get_Details: %v", err)
}
if !cmp.Equal(want, got) {
t.Errorf("unexpected result:\n- want\n+ got\n%s", cmp.Diff(want, got))
}

// stop trying after the maximum retries
query = make(url.Values)
query.Set("limit", "200")
_, _, err = GetUserDetails(context.Background(), ts.Client(), host, key, "", query, OmitNone, limiter, logger)
expectedErrMsg := "maximum retries (5) finished without success"
if err == nil {
t.Errorf("expected the error '%s', but got no error", expectedErrMsg)
} else if err.Error() != expectedErrMsg {
t.Errorf("expected error message '%s', but got '%s'", expectedErrMsg, err.Error())
}

})
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,19 @@ func (r RateLimiter) Update(endpoint string, h http.Header, log *logp.Logger) er
}
r.byEndpoint[endpoint] = newEndpointRateLimiter

resetTimeUTC := resetTime.UTC()
log.Debugw("rate limit block until reset", "reset_time", resetTimeUTC)

// next gives us a sane next window estimate, but the
// estimate will be overwritten when we make the next
// permissible API request.
next := rate.Limit(lim / r.window.Seconds())
var next rate.Limit
if lim == 0 {
log.Debugw("exceeded the concurrent rate limit")
next = rate.Limit(1)
} else {
next = rate.Limit(lim / r.window.Seconds())
}

resetTimeUTC := resetTime.UTC()
log.Debugw("rate limit block until reset", "reset_time", resetTimeUTC)
waitFor := time.Until(resetTimeUTC)

time.AfterFunc(waitFor, func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"testing"
"time"

"golang.org/x/time/rate"

"github.com/elastic/elastic-agent-libs/logp"
)

Expand Down Expand Up @@ -83,7 +85,7 @@
e = r.endpoint(endpoint)

start := time.Now()
r.Wait(ctx, endpoint, url, log)

Check failure on line 88 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `r.Wait` is not checked (errcheck)

Check failure on line 88 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `r.Wait` is not checked (errcheck)
wait := time.Since(start)

if wait > 1010*time.Millisecond {
Expand Down Expand Up @@ -126,8 +128,8 @@
log := logp.L()
ctx := context.Background()

r.Wait(ctx, endpoint, url, log) // consume the initial request

Check failure on line 131 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `r.Wait` is not checked (errcheck)

Check failure on line 131 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `r.Wait` is not checked (errcheck)
r.Update(endpoint, headers, log) // update to a slow rate

Check failure on line 132 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `r.Update` is not checked (errcheck)

Check failure on line 132 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `r.Update` is not checked (errcheck)

err = r.Wait(ctx, endpoint, url, log)

Expand All @@ -141,7 +143,7 @@

t.Run("A fixed limit overrides response information", func(t *testing.T) {
const window = time.Minute
var fixedLimit int = 120

Check failure on line 146 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

ST1023: should omit type int from declaration; it will be inferred from the right-hand side (stylecheck)

Check failure on line 146 in x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/ratelimiter_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ST1023: should omit type int from declaration; it will be inferred from the right-hand side (stylecheck)
r := NewRateLimiter(window, &fixedLimit)
const endpoint = "/foo"
e := r.endpoint(endpoint)
Expand All @@ -166,4 +168,52 @@
t.Errorf("unexpected rate following Update() (for fixed 120 reqs / 60 secs): %f", e.limiter.Limit())
}
})

t.Run("A concurrent rate limit should not set a new rate of zero", func(t *testing.T) {
const window = time.Minute
r := NewRateLimiter(window, nil)
const endpoint = "/foo"
url, err := url.Parse(endpoint)
if err != nil {
t.Errorf("unexpected error from url.Parse(): %v", err)
}
ctx := context.Background()
log := logp.L()

// update to 30 requests remaining, reset in 30s
headers := http.Header{
"X-Rate-Limit-Limit": []string{"60"},
"X-Rate-Limit-Remaining": []string{"30"},
"X-Rate-Limit-Reset": []string{strconv.FormatInt(time.Now().Unix()+30, 10)},
}
err = r.Update(endpoint, headers, logp.L())
if err != nil {
t.Errorf("unexpected error from Update(): %v", err)
}

// update to concurrent rate limit, reset now
headers = http.Header{
"X-Rate-Limit-Limit": []string{"0"},
"X-Rate-Limit-Remaining": []string{"0"},
"X-Rate-Limit-Reset": []string{strconv.FormatInt(time.Now().Unix(), 10)},
}
err = r.Update(endpoint, headers, logp.L())
if err != nil {
t.Errorf("unexpected error from Update(): %v", err)
}

// Wait to make the new rate become active
err = r.Wait(ctx, endpoint, url, log)
if err != nil {
t.Errorf("unexpected error from Wait(): %v", err)
}

e := r.endpoint(endpoint)

newLimit := e.limiter.Limit()
expectedNewLimit := rate.Limit(1)
if newLimit != expectedNewLimit {
t.Errorf("expected rate %f, but got %f, after exceeding the concurrent rate limit", expectedNewLimit, newLimit)
}
})
}
Loading