Skip to content

Commit

Permalink
Merge b9f3016 into 02198e5
Browse files Browse the repository at this point in the history
  • Loading branch information
benbp authored Sep 26, 2024
2 parents 02198e5 + b9f3016 commit ab6c404
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class NamespaceEventHandler
// Concurrent Federated Identity Credentials writes under the same managed identity are not supported
private static readonly SemaphoreSlim FederatedCredentialWriteSemaphore = new(1, 1);

private Dictionary<string, UserAssignedIdentityResource> WorkloadAppCache = [];

public List<string> WorkloadAppPool;
public string WorkloadAppIssuer;

Expand Down Expand Up @@ -62,6 +64,57 @@ public NamespaceEventHandler(
.CreateLogger();
}

public async Task SyncCredentials()
{
try
{
Logger.Information($"Waiting for federated credential write semaphore");
await FederatedCredentialWriteSemaphore.WaitAsync();
await _syncCredentials();
}
finally
{
Logger.Information("Releasing federated credential write semaphore");
FederatedCredentialWriteSemaphore.Release();
}
}

public async Task _syncCredentials()
{
Logger.Information("Syncing namespaced federated credentials, this may take a minute...");

var namespaces = await Client.ListNamespaceAsync();
foreach (var app in WorkloadAppPool)
{
var resourceId = UserAssignedIdentityResource.CreateResourceIdentifier(SubscriptionId, ClusterGroup, app);
var userAssignedIdentity = ArmClient.GetUserAssignedIdentityResource(resourceId);
var identityResource = await userAssignedIdentity.GetAsync();
var fedCreds = userAssignedIdentity.GetFederatedIdentityCredentials();
await foreach (var item in fedCreds.GetAllAsync())
{
if (!namespaces.Items.Any(ns => item.Data.Name == CreateFederatedIdentityCredentialName(ns)))
{
if (!string.IsNullOrEmpty(WatchNamespace) && item.Data.Name != CreateFederatedIdentityCredentialName(WatchNamespace))
{
Logger.Information($"Skipping delete federated credential '{item.Data.Name}' because it is not the watched namespace '{WatchNamespace}'");
continue;
}
// Only perform delete operations for namespace state that may have changed if the watcher was not running.
// Any create operations will be handled after initialization as the watch stream processes all active namespaces on startup
Logger.Information($"Deleting federated identity credential '{item.Data.Name}' for managed identity '{app}' as the corresponding namespace no longer exists.");
WorkloadAppCache.Remove(item.Data.Name);
var lro = await item.DeleteAsync(Azure.WaitUntil.Completed);
}
else
{
WorkloadAppCache[item.Data.Name] = identityResource.Value;
}
}
}

Logger.Information($"Federated credential sync complete. Cached {WorkloadAppCache.Count} federated credentials.");
}

public async Task Watch(CancellationToken cancellationToken)
{
string resourceVersion = null;
Expand Down Expand Up @@ -125,6 +178,10 @@ public void HandleNamespaceEvent(WatchEventType eventType, V1Namespace ns)
Logger.Information($"Skipping namespace '{ns.Name()}' because it is not the watched namespace '{WatchNamespace}'");
return;
}
if (ns.Status?.Phase == "Terminating")
{
return;
}

using (LogContext.PushProperty("namespace", ns.Name()))
{
Expand All @@ -139,24 +196,17 @@ public void HandleNamespaceEvent(WatchEventType eventType, V1Namespace ns)
}
});
}
else if (eventType == WatchEventType.Deleted)
{
DeleteFederatedIdentityCredential(ns).ContinueWith(t =>
{
Logger.Information("Releasing federated credential write semaphore");
FederatedCredentialWriteSemaphore.Release();
if (t.Exception != null)
{
Logger.Error(t.Exception, "Error deleting federated identity credential.");
}
});
}
}
}

public string CreateFederatedIdentityCredentialName(V1Namespace ns)
{
return $"stress-{ns.Name()}";
return CreateFederatedIdentityCredentialName(ns.Name());
}

public string CreateFederatedIdentityCredentialName(string ns)
{
return $"stress-{ns}";
}

public async Task InitializeWorkloadIdForNamespace(V1Namespace ns)
Expand All @@ -175,14 +225,21 @@ public async Task InitializeWorkloadIdForNamespace(V1Namespace ns)
var identityData = await selectedWorkloadIdentity.GetAsync();
var selectedWorkloadAppId = identityData.Value.Data.ClientId.ToString();

var meta = new V1ObjectMeta(){
var meta = new V1ObjectMeta()
{
Name = ns.Name(),
NamespaceProperty = ns.Name(),
Annotations = new Dictionary<string, string>(){
{ "azure.workload.identity/client-id", selectedWorkloadAppId }
}
};
var serviceAccount = new V1ServiceAccount(metadata: meta);
var allAccounts = await Client.ListNamespacedServiceAccountAsync(ns.Name());
if (allAccounts.Items.Any(sa => sa.Name() == ns.Name()))
{
Logger.Information($"Service account '{ns.Name()}/{ns.Name()}' already exists, skipping creation.");
return;
}
await Client.CreateNamespacedServiceAccountAsync(serviceAccount, ns.Name());
Logger.Information($"Created service account '{ns.Name()}/{ns.Name()}' with workload client id '{selectedWorkloadAppId}'");
}
Expand All @@ -200,6 +257,12 @@ public async Task<UserAssignedIdentityResource> CreateFederatedIdentityCredentia
Logger.Information($"Waiting for federated credential write semaphore");
await FederatedCredentialWriteSemaphore.WaitAsync();

if (WorkloadAppCache.ContainsKey(credentialName))
{
Logger.Information($"Found cache entry for federated credential {credentialName}, returning identity {WorkloadAppCache[credentialName].Data.ClientId}");
return await WorkloadAppCache[credentialName].GetAsync();
}

foreach (var workloadApp in WorkloadAppPool)
{
var userAssignedIdentityResourceId = UserAssignedIdentityResource.CreateResourceIdentifier(SubscriptionId, ClusterGroup, workloadApp);
Expand Down Expand Up @@ -246,50 +309,10 @@ public async Task<UserAssignedIdentityResource> CreateFederatedIdentityCredentia
Logger.Information($"Creating/updating federated identity credential '{credentialName}' " +
$"with subject '{subject}' for managed identity '{selectedWorkloadApp}'");
var lro = await federatedIdentityCredential.UpdateAsync(Azure.WaitUntil.Completed, fedCredData);
WorkloadAppCache[credentialName] = selectedIdentity;
Logger.Information($"Created federated identity credential '{lro.Value.Data.Name}'");

return selectedIdentity;
}

public async Task DeleteFederatedIdentityCredential(V1Namespace ns)
{
var credentialName = CreateFederatedIdentityCredentialName(ns);
var workloadApp = "";
foreach (var app in WorkloadAppPool)
{
var resourceId = UserAssignedIdentityResource.CreateResourceIdentifier(SubscriptionId, ClusterGroup, app);
var userAssignedIdentity = ArmClient.GetUserAssignedIdentityResource(resourceId);
var fedCreds = userAssignedIdentity.GetFederatedIdentityCredentials();
await foreach (var item in fedCreds.GetAllAsync())
{
if (item.Data.Name == credentialName)
{
workloadApp = app;
break;
}
}
if (!String.IsNullOrEmpty(workloadApp))
{
break;
}
}

if (string.IsNullOrEmpty(workloadApp))
{
Logger.Warning($"Federated identity credential '{credentialName}' not found in workload app pool. Skipping delete.");
return;
}

var federatedIdentityCredentialResourceId = FederatedIdentityCredentialResource.CreateResourceIdentifier(
SubscriptionId, ClusterGroup, workloadApp, credentialName);
var federatedIdentityCredential = ArmClient.GetFederatedIdentityCredentialResource(federatedIdentityCredentialResourceId);

Logger.Information($"Waiting for federated credential write semaphore");
await FederatedCredentialWriteSemaphore.WaitAsync();

Logger.Information($"Deleting federated identity credential '{credentialName}' for managed identity '{workloadApp}'");
var lro = await federatedIdentityCredential.DeleteAsync(Azure.WaitUntil.Completed);
Logger.Information($"Deleted federated identity credential '{credentialName}'");
}
}
}
12 changes: 12 additions & 0 deletions tools/stress-cluster/services/Stress.Watcher/src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Azure.ResourceManager;
using dotenv.net;
using YamlDotNet.RepresentationModel;
using System.Security.Cryptography;

namespace Stress.Watcher
{
Expand Down Expand Up @@ -79,6 +80,8 @@ static async Task Program(Options options)
var namespaceEventHandler = new NamespaceEventHandler(
client, armClient, workloadConfig.SubscriptionId, workloadConfig.ClusterGroup,
workloadConfig.WorkloadAppPool, workloadConfig.WorkloadAppIssuer, options.Namespace);
await namespaceEventHandler.SyncCredentials();
_ = PollAndSyncCredentials(namespaceEventHandler, 288); // poll every 12 hours

var cts = new CancellationTokenSource();
var taskList = new List<Task>
Expand Down Expand Up @@ -164,5 +167,14 @@ static WorkloadAuthConfig GetWorkloadConfigValues(Options options, Boolean isLoc
ClusterGroup = clusterGroup
};
}

static async Task PollAndSyncCredentials(NamespaceEventHandler namespaceHandler, int minutes)
{
while (true)
{
await Task.Delay(TimeSpan.FromMinutes(minutes));
await namespaceHandler.SyncCredentials();
}
}
}
}

0 comments on commit ab6c404

Please sign in to comment.