Skip to content

Commit

Permalink
[Filebeat] entityanalytics_azure - Fix accidental error overwrite in …
Browse files Browse the repository at this point in the history
…defer statement (#35169)

- In both the full sync and incremental update functions in Azure AD, a defer
statement was always overriding the error value from the function. If an error
occurred during the sync/update operation, it would be overwritten by the defer.
- Since the error value produced in the defer was not used elsewhere, it is now
stored in its own variable
- Added additional error information for OAuth2 response
- Added test case for error OAuth2 response
- Add SEI as codeowner of entityanalytics input
  • Loading branch information
taylor-swanson authored Apr 21, 2023
1 parent 68997b9 commit 0807bb8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 10 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ CHANGELOG*
/x-pack/filebeat/input/awss3/ @elastic/obs-cloud-monitoring
/x-pack/filebeat/input/azureblobstorage/ @elastic/security-external-integrations
/x-pack/filebeat/input/cel/ @elastic/security-external-integrations
/x-pack/filebeat/input/entityanalytics/ @elastic/security-external-integrations
/x-pack/filebeat/input/gcppubsub/ @elastic/security-external-integrations
/x-pack/filebeat/input/gcs/ @elastic/security-external-integrations
/x-pack/filebeat/input/http_endpoint/ @elastic/security-external-integrations
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Sanitize filenames for request tracer in httpjson and cel inputs. {pull}35143[35143]
- decode_cef processor: Fix ECS output by making `observer.ip` into an array of strings instead of string. {issue}35140[35140] {pull}35149[35149]
- Fix handling of MySQL audit logs with strict JSON parser. {issue}35158[35158] {pull}35160[35160]
- Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type authResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
ExtExpiresIn int `json:"ext_expires_in"`

Error string `json:"error"`
ErrorDescription string `json:"error_description"`
ErrorCodes []int `json:"error_codes"`
CorrelationID string `json:"correlation_id"`
TraceID string `json:"trace_id"`
ErrorURI string `json:"error_uri"`
}

// conf contains parameters needed to configure the authenticator.
Expand Down Expand Up @@ -89,7 +96,7 @@ func (a *oauth2) renewToken(ctx context.Context) error {
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf("token request returned unexpected status code: %d body: %s", res.StatusCode, string(resData))
return fmt.Errorf("token request returned unexpected status code: %s, body: %s", res.Status, string(resData))
}

var authRes authResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@ func testSetupServer(t *testing.T, tokenValue string, expiresIn int) *httptest.S
}))
}

func testSetupErrServer(t *testing.T) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
payload := authResponse{
Error: "invalid client",
ErrorDescription: "AADSTS7000215: Invalid client secret provided. Ensure the secret being sent in the request is the client secret value, not the client secret ID, for a secret added to app 'TEST-APP'.\\r\\nTrace ID: TRACE-ID\\r\\nCorrelation ID: CORRELATION-ID\\r\\nTimestamp: 2023-04-21 14:01:54Z",
ErrorCodes: []int{7000215},
TraceID: "TRACE-ID",
CorrelationID: "CORRELATION-ID",
ErrorURI: "https://login.microsoftonline.com/error?code=7000215",
}
data, err := json.Marshal(payload)
require.NoError(t, err)

w.WriteHeader(http.StatusUnauthorized)
_, err = w.Write(data)
require.NoError(t, err)

w.Header().Add("Content-Type", "application/json")
}))
}

func TestRenew(t *testing.T) {
t.Run("new-token", func(t *testing.T) {
value := "test-value"
Expand Down Expand Up @@ -95,4 +116,29 @@ func TestRenew(t *testing.T) {
require.Equal(t, expireTime, auth.(*oauth2).expires)
require.Equal(t, cachedToken, gotToken)
})

t.Run("invalid-token", func(t *testing.T) {
srv := testSetupErrServer(t)
defer srv.Close()

cfg, err := config.NewConfigFrom(&conf{
Endpoint: "http://" + srv.Listener.Addr().String(),
Secret: "value",
ClientID: "client-id",
TenantID: "tenant-id",
})
require.NoError(t, err)

auth, err := New(cfg, logp.L())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = auth.Token(ctx)
require.Error(t, err)

require.ErrorContains(t, err, "invalid client")
require.ErrorContains(t, err, "Invalid client secret provided")
})
}
18 changes: 9 additions & 9 deletions x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (p *azure) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Clien
case <-syncTimer.C:
start := time.Now()
if err := p.runFullSync(inputCtx, store, client); err != nil {
p.logger.Errorf("Error running full sync: %v", err)
p.logger.Errorw("Error running full sync", "error", err)
p.metrics.syncError.Inc()
}
p.metrics.syncTotal.Inc()
Expand All @@ -114,7 +114,7 @@ func (p *azure) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Clien
case <-updateTimer.C:
start := time.Now()
if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil {
p.logger.Errorf("Error running incremental update: %v", err)
p.logger.Errorw("Error running incremental update", "error", err)
p.metrics.updateError.Inc()
}
p.metrics.updateTotal.Inc()
Expand All @@ -129,7 +129,7 @@ func (p *azure) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Clien
// identities from Azure Active Directory, enrich users with group memberships,
// and publishes all known users (regardless if they have been modified) to the
// given beat.Client.
func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) (err error) {
func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger.Debugf("Running full sync...")

p.logger.Debugf("Opening new transaction...")
Expand All @@ -139,14 +139,14 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
}
p.logger.Debugf("Transaction opened")
defer func() { // If commit is successful, call to this close will be no-op.
if err = state.close(false); err != nil {
p.logger.Errorf("Error rolling back transaction: %v", err)
if closeErr := state.close(false); closeErr != nil {
p.logger.Errorw("Error rolling back full sync transaction", "error", closeErr)
}
}()

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")
if _, err := p.doFetch(ctx, state, true); err != nil {
if _, err = p.doFetch(ctx, state, true); err != nil {
return err
}

Expand Down Expand Up @@ -180,16 +180,16 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be
// runIncrementalUpdate will run an incremental update. The process is similar
// to full synchronization, except only users which have changed (newly
// discovered, modified, or deleted) will be published.
func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) (err error) {
func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger.Debugf("Running incremental update...")

state, err := newStateStore(store)
if err != nil {
return fmt.Errorf("unable to begin transaction: %w", err)
}
defer func() { // If commit is successful, call to this close will be no-op.
if err = state.close(false); err != nil {
p.logger.Errorf("Error rolling back transaction: %v", err)
if closeErr := state.close(false); closeErr != nil {
p.logger.Errorw("Error rolling back incremental update transaction", "error", closeErr)
}
}()

Expand Down

0 comments on commit 0807bb8

Please sign in to comment.