Skip to content

Commit

Permalink
fixed bugs (#46957)
Browse files Browse the repository at this point in the history
* fixed bugs

* PR feedback
  • Loading branch information
KrzysztofCwalina authored Nov 4, 2024
1 parent 8841bb7 commit 3cd1d32
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public CloudMachineWorkspace(Azure.Core.TokenCredential credential = null, Micro
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override Azure.Core.ClientConnectionOptions GetConnectionOptions(System.Type clientType, string instanceId = null) { throw null; }
public override Azure.Core.ClientConnectionOptions GetConnectionOptions(System.Type clientType, string instanceId) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
22 changes: 12 additions & 10 deletions sdk/cloudmachine/Azure.CloudMachine/src/CloudMachineWorkspace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,27 @@ public CloudMachineWorkspace(TokenCredential credential = default, IConfiguratio
/// <returns></returns>
/// <exception cref="Exception"></exception>
[EditorBrowsable(EditorBrowsableState.Never)]
public override ClientConnectionOptions GetConnectionOptions(Type clientType, string instanceId = default)
public override ClientConnectionOptions GetConnectionOptions(Type clientType, string instanceId)
{
string clientId = clientType.FullName;
if (instanceId != null && instanceId.StartsWith("$")) clientId = $"{clientType.FullName}{instanceId}";

switch (clientId)
{
case "Azure.Security.KeyVault.Secrets.SecretClient":
return new ClientConnectionOptions(new($"https://{this.Id}.vault.azure.net/"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.vault.azure.net/"), Credential);
case "Azure.Messaging.ServiceBus.ServiceBusClient":
return new ClientConnectionOptions(new($"https://{this.Id}.servicebus.windows.net"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.servicebus.windows.net"), Credential);
case "Azure.Messaging.ServiceBus.ServiceBusSender":
if (instanceId == default)
instanceId = "cm_servicebus_subscription_private";
return new ClientConnectionOptions(instanceId);
return new ClientConnectionOptions(instanceId?? "cm_servicebus_default_topic");
case "Azure.Messaging.ServiceBus.ServiceBusProcessor":
return new ClientConnectionOptions("cm_servicebus_default_topic/cm_servicebus_subscription_default");
case "Azure.Messaging.ServiceBus.ServiceBusProcessor$private":
return new ClientConnectionOptions("cm_servicebus_topic_private/cm_servicebus_subscription_private");
case "Azure.Storage.Blobs.BlobContainerClient":
if (instanceId == default)
instanceId = "default";
return new ClientConnectionOptions(new($"https://{this.Id}.blob.core.windows.net/{instanceId}"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.blob.core.windows.net/{instanceId??"default"}"), Credential);
case "Azure.AI.OpenAI.AzureOpenAIClient":
return new ClientConnectionOptions(new($"https://{this.Id}.openai.azure.com"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.openai.azure.com"), Credential);
case "OpenAI.Chat.ChatClient":
return new ClientConnectionOptions(Id);
case "OpenAI.Embeddings.EmbeddingClient":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void SendMessage(object serializable)
/// <param name="received"></param>
public void WhenMessageReceived(Action<string> received)
{
var processor = _cm.Messaging.GetServiceBusProcessor();
var processor = _cm.Messaging.GetServiceBusProcessor(default);
var cm = _cm;

// TODO: How to unsubscribe?
Expand Down Expand Up @@ -67,36 +67,34 @@ private ServiceBusSender GetServiceBusSender()
return sender;
}

internal ServiceBusProcessor GetServiceBusProcessor()
internal ServiceBusProcessor GetServiceBusProcessor(string id)
{
MessagingServices messagingServices = this;
ServiceBusProcessor sender = _cm.Subclients.Get(() => messagingServices.CreateProcessor());
return sender;
ServiceBusProcessor processor = _cm.Subclients.Get(() => messagingServices.CreateProcessor(id), id);
return processor;
}

private ServiceBusSender CreateSender()
{
ServiceBusClient client = GetServiceBusClient();

ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient));
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender), default);
ServiceBusSender sender = client.CreateSender(connection.Id);
return sender;
}
private ServiceBusClient CreateClient()
{
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient));
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient), default);
ServiceBusClient client = new(connection.Endpoint!.AbsoluteUri, connection.TokenCredential);
return client;
}
private ServiceBusProcessor CreateProcessor()
private ServiceBusProcessor CreateProcessor(string id)
{
ServiceBusClient client = GetServiceBusClient();

ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender));
ServiceBusProcessor processor = client.CreateProcessor(
connection.Id,
"cm_servicebus_subscription_private",
new() { ReceiveMode = ServiceBusReceiveMode.PeekLock, MaxConcurrentCalls = 5 });
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusProcessor), id);
string[] topicAndSubscription = connection.Id.Split('/');
ServiceBusProcessor processor = client.CreateProcessor(topicAndSubscription[0], topicAndSubscription[1], new() { MaxConcurrentCalls = 5 });
processor.ProcessErrorAsync += (args) => throw new Exception("error processing event", args.Exception);
return processor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Messaging.ServiceBus;

namespace Azure.CloudMachine;

Expand All @@ -19,14 +20,15 @@ namespace Azure.CloudMachine;
public readonly struct StorageServices
{
private readonly CloudMachineClient _cm;

internal StorageServices(CloudMachineClient cm) => _cm = cm;

private BlobContainerClient GetDefaultContainer()
{
CloudMachineClient cm = _cm;
BlobContainerClient container = _cm.Subclients.Get(() =>
{
ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient));
ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient), default);
BlobContainerClient container = new(connection.Endpoint, connection.TokenCredential);
return container;
});
Expand Down Expand Up @@ -198,9 +200,9 @@ private static string ConvertPathToBlobPath(string path, BlobContainerClient con
/// <param name="function"></param>
public void WhenBlobUploaded(Action<StorageFile> function)
{
var processor = _cm.Messaging.GetServiceBusProcessor();
var cm = _cm;

CloudMachineClient cm = _cm;
// TODO (Pri 0): once the cache gets GCed, we will stop receiving events
ServiceBusProcessor processor = cm.Messaging.GetServiceBusProcessor("$private");
// TODO: How to unsubscribe?
processor.ProcessMessageAsync += async (args) =>
{
Expand Down

0 comments on commit 3cd1d32

Please sign in to comment.