Skip to content

Commit

Permalink
Loki: Add support for Azure Workload Identity authentication (#7540)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tolsto authored Jan 13, 2023
1 parent ce09aac commit e4fbd3c
Show file tree
Hide file tree
Showing 12 changed files with 532 additions and 4 deletions.
7 changes: 7 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3721,6 +3721,13 @@ The `azure_storage_config` block configures the connection to Azure object stora
# CLI flag: -<prefix>.azure.use-managed-identity
[use_managed_identity: <boolean> | default = false]
# Use a Federated Token to authenticate to the Azure storage account.
# Enable if you want to use Azure Workload Identity. Expects AZURE_CLIENT_ID,
# AZURE_TENANT_ID and AZURE_FEDERATED_TOKEN_FILE envs to be present (set automatically
# when using Azure Workload Identity).
# CLI flag: -<prefix>.azure.use-federated-token
[use_federated_token: <boolean> | default = false]
# User assigned identity ID to authenticate to the Azure storage account.
# CLI flag: -<prefix>.azure.user-assigned-id
[user_assigned_id: <string> | default = ""]
Expand Down
1 change: 1 addition & 0 deletions docs/sources/installation/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,7 @@ null
"accountKey": null,
"accountName": null,
"requestTimeout": null,
"useFederatedToken": false,
"useManagedIdentity": false,
"userAssignedId": null
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ require (
)

require (
github.com/Azure/go-autorest/autorest v0.11.28
github.com/fsnotify/fsnotify v1.6.0
github.com/heroku/x v0.0.50
github.com/prometheus/alertmanager v0.25.0
Expand All @@ -133,7 +134,6 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.28 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
Expand Down
72 changes: 69 additions & 3 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/grafana/dskit/flagext"
"github.com/mattn/go-ieproxy"
Expand All @@ -32,6 +34,7 @@ import (
const (
// Environment
azureGlobal = "AzureGlobal"
azurePublicCloud = "AzurePublicCloud"
azureChinaCloud = "AzureChinaCloud"
azureGermanCloud = "AzureGermanCloud"
azureUSGovernment = "AzureUSGovernment"
Expand All @@ -48,6 +51,11 @@ var (
azureUSGovernment: "blob.core.usgovcloudapi.net",
}

defaultAuthFunctions = authFunctions{
NewOAuthConfigFunc: adal.NewOAuthConfig,
NewServicePrincipalTokenFromFederatedTokenFunc: adal.NewServicePrincipalTokenFromFederatedToken,
}

// default Azure http client.
defaultClientFactory = func() *http.Client {
return &http.Client{
Expand Down Expand Up @@ -79,6 +87,7 @@ type BlobStorageConfig struct {
ContainerName string `yaml:"container_name"`
Endpoint string `yaml:"endpoint_suffix"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
UseFederatedToken bool `yaml:"use_federated_token"`
UserAssignedID string `yaml:"user_assigned_id"`
UseServicePrincipal bool `yaml:"use_service_principal"`
ClientID string `yaml:"client_id"`
Expand All @@ -94,6 +103,11 @@ type BlobStorageConfig struct {
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
}

type authFunctions struct {
NewOAuthConfigFunc func(activeDirectoryEndpoint, tenantID string) (*adal.OAuthConfig, error)
NewServicePrincipalTokenFromFederatedTokenFunc func(oauthConfig adal.OAuthConfig, clientID string, jwt string, resource string, callbacks ...adal.TokenRefreshCallback) (*adal.ServicePrincipalToken, error)
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", f)
Expand All @@ -107,6 +121,7 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the storage account blob container used to store chunks. This container must be created before running cortex.")
f.StringVar(&c.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The storage account name will be prefixed to this value to create the FQDN.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity to authenticate to the Azure storage account.")
f.BoolVar(&c.UseFederatedToken, prefix+"azure.use-federated-token", false, "Use Federated Token to authenticate to the Azure storage account.")
f.StringVar(&c.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned identity ID to authenticate to the Azure storage account.")
f.StringVar(&c.ChunkDelimiter, prefix+"azure.chunk-delimiter", "-", "Chunk delimiter for blob ID to be used")
f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.")
Expand Down Expand Up @@ -316,7 +331,7 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
})
}

if !b.cfg.UseManagedIdentity && !b.cfg.UseServicePrincipal && b.cfg.UserAssignedID == "" {
if !b.cfg.UseFederatedToken && !b.cfg.UseManagedIdentity && !b.cfg.UseServicePrincipal && b.cfg.UserAssignedID == "" {
credential, err := azblob.NewSharedKeyCredential(b.cfg.StorageAccountName, b.cfg.StorageAccountKey.String())
if err != nil {
return nil, err
Expand All @@ -341,7 +356,7 @@ func (b *BlobStorage) getOAuthToken() (azblob.TokenCredential, error) {
if b.tc != nil {
return b.tc, nil
}
spt, err := b.getServicePrincipalToken()
spt, err := b.getServicePrincipalToken(defaultAuthFunctions)
if err != nil {
return nil, err
}
Expand All @@ -368,7 +383,7 @@ func (b *BlobStorage) getOAuthToken() (azblob.TokenCredential, error) {
return b.tc, nil
}

func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, error) {
func (b *BlobStorage) getServicePrincipalToken(authFunctions authFunctions) (*adal.ServicePrincipalToken, error) {
var endpoint string
if b.cfg.Endpoint != "" {
endpoint = b.cfg.Endpoint
Expand All @@ -378,6 +393,28 @@ func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, e

resource := fmt.Sprintf("https://%s.%s", b.cfg.StorageAccountName, endpoint)

if b.cfg.UseFederatedToken {
token, err := b.servicePrincipalTokenFromFederatedToken(resource, authFunctions.NewOAuthConfigFunc, authFunctions.NewServicePrincipalTokenFromFederatedTokenFunc)
var customRefreshFunc adal.TokenRefresh = func(context context.Context, resource string) (*adal.Token, error) {
newToken, err := b.servicePrincipalTokenFromFederatedToken(resource, authFunctions.NewOAuthConfigFunc, authFunctions.NewServicePrincipalTokenFromFederatedTokenFunc)
if err != nil {
return nil, err
}

err = newToken.Refresh()
if err != nil {
return nil, err
}

token := newToken.Token()

return &token, nil
}

token.SetCustomRefreshFunc(customRefreshFunc)
return token, err
}

if b.cfg.UseServicePrincipal {
config := auth.NewClientCredentialsConfig(b.cfg.ClientID, b.cfg.ClientSecret.String(), b.cfg.TenantID)
config.Resource = resource
Expand All @@ -395,6 +432,35 @@ func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, e
return msiConfig.ServicePrincipalToken()
}

func (b *BlobStorage) servicePrincipalTokenFromFederatedToken(resource string, newOAuthConfigFunc func(activeDirectoryEndpoint, tenantID string) (*adal.OAuthConfig, error), newServicePrincipalTokenFromFederatedTokenFunc func(oauthConfig adal.OAuthConfig, clientID string, jwt string, resource string, callbacks ...adal.TokenRefreshCallback) (*adal.ServicePrincipalToken, error)) (*adal.ServicePrincipalToken, error) {
environmentName := azurePublicCloud
if b.cfg.Environment != azureGlobal {
environmentName = b.cfg.Environment
}

env, err := azure.EnvironmentFromName(environmentName)
if err != nil {
return nil, err
}

azClientID := os.Getenv("AZURE_CLIENT_ID")
azTenantID := os.Getenv("AZURE_TENANT_ID")

jwtBytes, err := os.ReadFile(os.Getenv("AZURE_FEDERATED_TOKEN_FILE"))
if err != nil {
return nil, err
}

jwt := string(jwtBytes)

oauthConfig, err := newOAuthConfigFunc(env.ActiveDirectoryEndpoint, azTenantID)
if err != nil {
return nil, err
}

return newServicePrincipalTokenFromFederatedTokenFunc(*oauthConfig, azClientID, jwt, resource)
}

// List implements chunk.ObjectClient.
func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
Expand Down
60 changes: 60 additions & 0 deletions pkg/storage/chunk/client/azure/blob_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import (
"context"
"net/http"
"net/url"
"os"
"strings"
"testing"
"time"

"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
Expand All @@ -24,6 +28,58 @@ func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error)
return fn(req)
}

type FederatedTokenTestSuite struct {
suite.Suite
config *BlobStorage
mockOAuthConfig *adal.OAuthConfig
mockedServicePrincipalToken *adal.ServicePrincipalToken
}

func (suite *FederatedTokenTestSuite) SetupTest() {
suite.mockOAuthConfig, _ = adal.NewOAuthConfig("foo", "bar")
suite.mockedServicePrincipalToken = new(adal.ServicePrincipalToken)
suite.config = &BlobStorage{
cfg: &BlobStorageConfig{
ContainerName: "foo",
StorageAccountName: "bar",
Environment: azureGlobal,
UseFederatedToken: true,
},
}

suite.T().Setenv("AZURE_CLIENT_ID", "myClientId")
suite.T().Setenv("AZURE_TENANT_ID", "myTenantId")

tmpDir := suite.T().TempDir()
_ = os.WriteFile(tmpDir+"/jwtToken", []byte("myJwtToken"), 0666)
suite.T().Setenv("AZURE_FEDERATED_TOKEN_FILE", tmpDir+"/jwtToken")
}

func (suite *FederatedTokenTestSuite) TestGetServicePrincipalToken() {
newOAuthConfigFunc := func(activeDirectoryEndpoint, tenantID string) (*adal.OAuthConfig, error) {
require.Equal(suite.T(), azure.PublicCloud.ActiveDirectoryEndpoint, activeDirectoryEndpoint)
require.Equal(suite.T(), "myTenantId", tenantID)

_, err := adal.NewOAuthConfig(activeDirectoryEndpoint, tenantID)
require.NoError(suite.T(), err)

return suite.mockOAuthConfig, nil
}

servicePrincipalTokenFromFederatedTokenFunc := func(oauthConfig adal.OAuthConfig, clientID string, jwt string, resource string, callbacks ...adal.TokenRefreshCallback) (*adal.ServicePrincipalToken, error) {
require.True(suite.T(), *suite.mockOAuthConfig == oauthConfig, "should return the mocked object")
require.Equal(suite.T(), "myClientId", clientID)
require.Equal(suite.T(), "myJwtToken", jwt)
require.Equal(suite.T(), "https://bar.blob.core.windows.net", resource)
return suite.mockedServicePrincipalToken, nil
}

token, err := suite.config.getServicePrincipalToken(authFunctions{newOAuthConfigFunc, servicePrincipalTokenFromFederatedTokenFunc})

require.NoError(suite.T(), err)
require.True(suite.T(), suite.mockedServicePrincipalToken == token, "should return the mocked object")
}

func Test_Hedging(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down Expand Up @@ -131,6 +187,10 @@ func Test_DefaultBlobURL(t *testing.T) {
require.Equal(t, *expect, bloburl.URL())
}

func Test_UseFederatedToken(t *testing.T) {
suite.Run(t, new(FederatedTokenTestSuite))
}

func Test_EndpointSuffixWithBlob(t *testing.T) {
c, err := NewBlobStorage(&BlobStorageConfig{
ContainerName: "foo",
Expand Down
2 changes: 2 additions & 0 deletions production/helm/loki/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ azure:
{{- end }}
container_name: {{ $.Values.loki.storage.bucketNames.chunks }}
use_managed_identity: {{ .useManagedIdentity }}
use_federated_token: {{ .useFederatedToken }}
{{- with .userAssignedId }}
user_assigned_id: {{ . }}
{{- end }}
Expand Down Expand Up @@ -281,6 +282,7 @@ azure:
{{- end }}
container_name: {{ $.Values.loki.storage.bucketNames.ruler }}
use_managed_identity: {{ .useManagedIdentity }}
use_federated_token: {{ .useFederatedToken }}
{{- with .userAssignedId }}
user_assigned_id: {{ . }}
{{- end }}
Expand Down
1 change: 1 addition & 0 deletions production/helm/loki/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ loki:
accountName: null
accountKey: null
useManagedIdentity: false
useFederatedToken: false
userAssignedId: null
requestTimeout: null
filesystem:
Expand Down
65 changes: 65 additions & 0 deletions vendor/github.com/stretchr/testify/suite/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e4fbd3c

Please sign in to comment.