Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #40879) [filbeat][azure-blob-storage] - Adding support for Microsoft Entra ID RBAC authentication #41060

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Disable event normalization for netflow input {pull}40635[40635]
- Allow attribute selection in the Active Directory entity analytics provider. {issue}40482[40482] {pull}40662[40662]
- Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580]
- Added support for Microsoft Entra ID RBAC authentication. {issue}40434[40434] {pull}40879[40879]
- Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301]
- Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912]
- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]
Expand Down
54 changes: 43 additions & 11 deletions x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,18 @@ Now let's explore the configuration attributes a bit more elaborately.
*Supported Attributes :-*

1. <<attrib-account-name,account_name>>
2. <<attrib-auth-shared-account-key,auth.shared_credentials.account_key>>
3. <<attrib-auth-connection-string,auth.connection_string.uri>>
4. <<attrib-storage-url,storage_url>>
5. <<attrib-containers,containers>>
6. <<attrib-container-name,name>>
7. <<attrib-max_workers,max_workers>>
8. <<attrib-poll,poll>>
9. <<attrib-poll_interval,poll_interval>>
10. <<attrib-file_selectors,file_selectors>>
11. <<attrib-expand_event_list_from_field,expand_event_list_from_field>>
12. <<attrib-timestamp_epoch,timestamp_epoch>>
2. <<attrib-auth-oauth2,auth.oauth2>>
3. <<attrib-auth-shared-account-key,auth.shared_credentials.account_key>>
4. <<attrib-auth-connection-string,auth.connection_string.uri>>
5. <<attrib-storage-url,storage_url>>
6. <<attrib-containers,containers>>
7. <<attrib-container-name,name>>
8. <<attrib-max_workers,max_workers>>
9. <<attrib-poll,poll>>
10. <<attrib-poll_interval,poll_interval>>
11. <<attrib-file_selectors,file_selectors>>
12. <<attrib-expand_event_list_from_field,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch,timestamp_epoch>>


[id="attrib-account-name"]
Expand All @@ -148,6 +149,37 @@ Now let's explore the configuration attributes a bit more elaborately.
This attribute is required for various internal operations with respect to authentication, creating service clients and blob clients which are used internally
for various processing purposes.

[id="attrib-auth-oauth2"]
[float]
==== `auth.oauth2`

This attribute contains the Microsoft Entra ID RBAC authentication credentials for a secure connection to the Azure Blob Storage. The `auth.oauth2` attribute contains the following sub-attributes:

1. `client_id`: The client ID of the Azure Entra ID application.
2. `client_secret`: The client secret of the Azure Entra ID application.
3. `tenant_id`: The tenant ID of the Azure Entra ID application.

A sample configuration with `auth.oauth2` is given below:

["source","yaml"]
----
filebeat.inputs:
- type: azure-blob-storage
account_name: some_account
auth.oauth2:
client_id: "some_client_id"
client_secret: "some_client_secret"
tenant_id: "some_tenant_id"
containers:
- name: container_1
max_workers: 3
poll: true
poll_interval: 10s
----
How to setup the `auth.oauth2` credentials can be found in the Azure documentation https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app[here]

NOTE: According to our internal testing it seems that we require at least an access level of **blobOwner** for the service principle to be able to read the blobs. If you are facing any issues with the access level, ensure that the access level is set to **blobOwner**.

[id="attrib-auth-shared-account-key"]
[float]
==== `auth.shared_credentials.account_key`
Expand Down
189 changes: 189 additions & 0 deletions x-pack/filebeat/input/azureblobstorage/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package azureblobstorage

import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
beattest "github.com/elastic/beats/v7/libbeat/publisher/testing"
"github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage/mock"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// customTransporter implements the Transporter interface with a custom Do & RoundTrip method
type customTransporter struct {
rt http.RoundTripper
servURL string
}

func (t *customTransporter) RoundTrip(req *http.Request) (*http.Response, error) {
return t.rt.RoundTrip(req)
}

// Do is responsible for the routing of the request to the appropriate handler based on the request URL
func (t *customTransporter) Do(req *http.Request) (*http.Response, error) {
logp.L().Named("azure-blob-storage-test").Debug("request URL: ", req.URL)
re := regexp.MustCompile(`^/([0-9a-fA-F-]+)/?(oauth2/v2\.0/token|v2\.0/\.well-known/openid-configuration)`)
matches := re.FindStringSubmatch(req.URL.Path)

if len(matches) == 3 {
tenant_id := matches[1]
action := matches[2]

switch action {
case "v2.0/.well-known/openid-configuration":
return createJSONResponse(map[string]interface{}{
"token_endpoint": t.servURL + "/" + tenant_id + "/oauth2/v2.0/token",
"authorization_endpoint": t.servURL + "/" + tenant_id + "/oauth2/v2.0/authorize",
"issuer": t.servURL + "/" + tenant_id + "/oauth2/v2.0/issuer",
}, 200)

case "oauth2/v2.0/token":
return createJSONResponse(map[string]interface{}{
"token_type": "Bearer",
"expires_in": 3600,
"access_token": "mock_access_token_123",
}, 200)
}
}
return t.rt.RoundTrip(req)
}

func createJSONResponse(data interface{}, statusCode int) (*http.Response, error) {
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}

resp := &http.Response{
StatusCode: statusCode,
Body: io.NopCloser(bytes.NewBuffer(jsonData)),
Header: make(http.Header),
}

resp.Header.Set("Content-Type", "application/json")
return resp, nil
}

func Test_OAuth2(t *testing.T) {
tests := []struct {
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
}{
{
name: "OAuth2TConfig",
baseConfig: map[string]interface{}{
"account_name": "beatsblobnew",
"auth.oauth2": map[string]interface{}{
"client_id": "12345678-90ab-cdef-1234-567890abcdef",
"client_secret": "abcdefg1234567890!@#$%^&*()-_=+",
"tenant_id": "87654321-abcd-ef90-1234-fedcba098765",
},
"max_workers": 2,
"poll": true,
"poll_interval": "30s",
"containers": []map[string]interface{}{
{
"name": beatsContainer,
},
},
},
mockHandler: mock.AzureStorageServer,
expected: map[string]bool{
mock.Beatscontainer_blob_ata_json: true,
mock.Beatscontainer_blob_data3_json: true,
mock.Beatscontainer_blob_docs_ata_json: true,
},
},
}

logp.TestingSetup()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
serv := httptest.NewServer(tt.mockHandler())
t.Cleanup(serv.Close)

httpClient := &http.Client{
Transport: &customTransporter{
rt: http.DefaultTransport,
servURL: serv.URL,
},
}

cfg := conf.MustNewConfigFrom(tt.baseConfig)
conf := config{}
err := cfg.Unpack(&conf)
assert.NoError(t, err)

// inject custom transport & client options
conf.Auth.OAuth2.clientOptions = azcore.ClientOptions{
InsecureAllowCredentialWithHTTP: true,
Transport: httpClient.Transport.(*customTransporter),
}

input := newStatelessInput(conf, serv.URL+"/")

assert.Equal(t, "azure-blob-storage-stateless", input.Name())
assert.NoError(t, input.Test(v2.TestContext{}))

chanClient := beattest.NewChanClient(len(tt.expected))
t.Cleanup(func() { _ = chanClient.Close() })

ctx, cancel := newV2Context()
t.Cleanup(cancel)
ctx.ID += tt.name

var g errgroup.Group
g.Go(func() error {
return input.Run(ctx, chanClient)
})

var timeout *time.Timer
if conf.PollInterval != nil {
timeout = time.NewTimer(1*time.Second + *conf.PollInterval)
} else {
timeout = time.NewTimer(10 * time.Second)
}
t.Cleanup(func() { timeout.Stop() })

var receivedCount int
wait:
for {
select {
case <-timeout.C:
t.Errorf("timed out waiting for %d events", len(tt.expected))
cancel()
return
case got := <-chanClient.Channel:
var val interface{}
var err error
val, err = got.Fields.GetValue("message")
assert.NoError(t, err)
assert.True(t, tt.expected[val.(string)])
receivedCount += 1
if receivedCount == len(tt.expected) {
cancel()
break wait
}
}
}
})
}
}
41 changes: 38 additions & 3 deletions x-pack/filebeat/input/azureblobstorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package azureblobstorage
import (
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
azcontainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
Expand All @@ -16,10 +17,13 @@ import (
)

func fetchServiceClientAndCreds(cfg config, url string, log *logp.Logger) (*service.Client, *serviceCredentials, error) {
if cfg.Auth.SharedCredentials != nil {
switch {
case cfg.Auth.SharedCredentials != nil:
return fetchServiceClientWithSharedKeyCreds(url, cfg.AccountName, cfg.Auth.SharedCredentials, log)
} else if cfg.Auth.ConnectionString != nil {
case cfg.Auth.ConnectionString != nil:
return fetchServiceClientWithConnectionString(cfg.Auth.ConnectionString, log)
case cfg.Auth.OAuth2 != nil:
return fetchServiceClientWithOAuth2(url, cfg.Auth.OAuth2)
}

return nil, nil, fmt.Errorf("no valid auth specified")
Expand Down Expand Up @@ -52,8 +56,26 @@ func fetchServiceClientWithConnectionString(connectionString *connectionStringCo
return serviceClient, &serviceCredentials{connectionStrCreds: connectionString.URI, cType: connectionStringType}, nil
}

func fetchServiceClientWithOAuth2(url string, cfg *OAuth2Config) (*service.Client, *serviceCredentials, error) {
creds, err := azidentity.NewClientSecretCredential(cfg.TenantID, cfg.ClientID, cfg.ClientSecret, &azidentity.ClientSecretCredentialOptions{
ClientOptions: cfg.clientOptions,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to create client secret credential with oauth2 config: %w", err)
}

client, err := azblob.NewClient(url, creds, &azblob.ClientOptions{
ClientOptions: cfg.clientOptions,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to create azblob service client: %w", err)
}

return client.ServiceClient(), &serviceCredentials{oauth2Creds: creds, cType: oauth2Type}, nil
}

// fetchBlobClient, generic function that returns a BlobClient based on the credential type
func fetchBlobClient(url string, credential *blobCredentials, log *logp.Logger) (*blob.Client, error) {
func fetchBlobClient(url string, credential *blobCredentials, cfg config, log *logp.Logger) (*blob.Client, error) {
if credential == nil {
return nil, fmt.Errorf("no valid blob credentials found")
}
Expand All @@ -63,6 +85,8 @@ func fetchBlobClient(url string, credential *blobCredentials, log *logp.Logger)
return fetchBlobClientWithSharedKey(url, credential.serviceCreds.sharedKeyCreds, log)
case connectionStringType:
return fetchBlobClientWithConnectionString(credential.serviceCreds.connectionStrCreds, credential.containerName, credential.blobName, log)
case oauth2Type:
return fetchBlobClientWithOAuth2(url, credential.serviceCreds.oauth2Creds, cfg.Auth.OAuth2)
default:
return nil, fmt.Errorf("no valid service credential 'type' found: %s", credential.serviceCreds.cType)
}
Expand All @@ -88,6 +112,17 @@ func fetchBlobClientWithConnectionString(connectionString string, containerName
return blobClient, nil
}

func fetchBlobClientWithOAuth2(url string, credential *azidentity.ClientSecretCredential, oauth2Cfg *OAuth2Config) (*blob.Client, error) {
blobClient, err := blob.NewClient(url, credential, &blob.ClientOptions{
ClientOptions: oauth2Cfg.clientOptions,
})
if err != nil {
return nil, fmt.Errorf("failed to fetch blob client for %s: %w", url, err)
}

return blobClient, nil
}

func fetchContainerClient(serviceClient *service.Client, containerName string, log *logp.Logger) (*azcontainer.Client, error) {
return serviceClient.NewContainerClient(containerName), nil
}
25 changes: 22 additions & 3 deletions x-pack/filebeat/input/azureblobstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
package azureblobstorage

import (
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"

"github.com/elastic/beats/v7/libbeat/common/match"
)

Expand Down Expand Up @@ -44,19 +47,35 @@ type fileSelectorConfig struct {
}

type authConfig struct {
SharedCredentials *sharedKeyConfig `config:"shared_credentials,omitempty"`
ConnectionString *connectionStringConfig `config:"connection_string,omitempty"`
SharedCredentials *sharedKeyConfig `config:"shared_credentials"`
ConnectionString *connectionStringConfig `config:"connection_string"`
OAuth2 *OAuth2Config `config:"oauth2"`
}

type connectionStringConfig struct {
URI string `config:"uri,omitempty"`
URI string `config:"uri"`
}
type sharedKeyConfig struct {
AccountKey string `config:"account_key"`
}

type OAuth2Config struct {
ClientID string `config:"client_id"`
ClientSecret string `config:"client_secret"`
TenantID string `config:"tenant_id"`
// clientOptions is used internally for testing purposes only
clientOptions azcore.ClientOptions
}

func defaultConfig() config {
return config{
AccountName: "some_account",
}
}

func (c config) Validate() error {
if c.Auth.OAuth2 != nil && (c.Auth.OAuth2.ClientID == "" || c.Auth.OAuth2.ClientSecret == "" || c.Auth.OAuth2.TenantID == "") {
return errors.New("client_id, client_secret and tenant_id are required for OAuth2 auth")
}
return nil
}
Loading
Loading