diff --git a/Framework/Common/ServiceBusUtils.cs b/Framework/Common/ServiceBusUtils.cs index 38c2bda2a..b47311875 100644 --- a/Framework/Common/ServiceBusUtils.cs +++ b/Framework/Common/ServiceBusUtils.cs @@ -137,7 +137,7 @@ static async Task GenerateBrokeredMessageWithStorageKeyProperty // save the compressed stream using external storage when it is larger // than the supported message size limit. // the stream is stored using the generated key, which is saved in the message property. - string storageKey = orchestrationServiceBlobStore.BuildMessageStorageKey(instance, messageFireTime); + string storageKey = orchestrationServiceBlobStore.BuildMessageBlobKey(instance, messageFireTime); TraceHelper.TraceInstance( TraceEventType.Information, diff --git a/Framework/Serializing/RuntimeStateStreamConverter.cs b/Framework/Serializing/RuntimeStateStreamConverter.cs index 01507ef6c..68815c4eb 100644 --- a/Framework/Serializing/RuntimeStateStreamConverter.cs +++ b/Framework/Serializing/RuntimeStateStreamConverter.cs @@ -28,7 +28,7 @@ namespace DurableTask.Serializing /// /// A converter that does conversion between the OrchestrationRuntimeState instance and a stream after serialization. /// The stream is a serialized OrchestrationSessionState that will set as session state. - /// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList. + /// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList of HistoryEvent. /// class RuntimeStateStreamConverter { @@ -96,7 +96,7 @@ async static Task CreateStreamForExternalStorageAsync( } // create a new orchestration session state with the external storage key - string key = orchestrationServiceBlobStore.BuildSessionStorageKey(sessionId); + string key = orchestrationServiceBlobStore.BuildSessionBlobKey(sessionId); OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(key); string serializedStateExternal = dataConverter.Serialize(orchestrationSessionState); @@ -197,11 +197,11 @@ static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, st /// Deserialize the session state to construct an OrchestrationRuntimeState instance. /// /// The session state string could be one of these: - /// 1. a serialized IList (master branch implementation), or + /// 1. a serialized IList of HistoryEvent (master branch implementation), or /// 2. a serialized OrchestrationRuntimeState instance with the history event list (vnext branch implementation), or /// 3. a serialized OrchestrationSessionState instance with the history event list or a storage key (latest implementation). /// - /// So when doing the deserialization, it is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList, to cover all cases. + /// So when doing the deserialization, it is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList of HistoryEvent, to cover all cases. /// /// /// The serialized session state @@ -225,7 +225,7 @@ static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string se TraceHelper.TraceSession( TraceEventType.Warning, sessionId, - $"Failed to deserialize session state to OrchestrationSessionState object: {serializedState}"); + $"Failed to deserialize session state to OrchestrationSessionState object: {serializedState}. More info: {exception.StackTrace}"); try { OrchestrationRuntimeState restoredState = @@ -238,7 +238,7 @@ static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string se TraceHelper.TraceSession( TraceEventType.Warning, sessionId, - $"Failed to deserialize session state to OrchestrationRuntimeState object: {serializedState}"); + $"Failed to deserialize session state to OrchestrationRuntimeState object: {serializedState}. More info: {e.StackTrace}"); IList events = dataConverter.Deserialize>(serializedState); runtimeState = new OrchestrationRuntimeState(events); diff --git a/Framework/ServiceBusOrchestrationService.cs b/Framework/ServiceBusOrchestrationService.cs index f7de3e458..4a342c815 100644 --- a/Framework/ServiceBusOrchestrationService.cs +++ b/Framework/ServiceBusOrchestrationService.cs @@ -1415,7 +1415,7 @@ async Task TrySetSessionState( isSessionSizeThresholdExceeded = true; - string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {Settings.SessionSettings.SessionMaxSizeInBytes} bytes"; + string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {Settings.SessionSettings.SessionMaxSizeInBytes} bytes. More info: {exception.StackTrace}"; TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason); BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason); diff --git a/Framework/Tracking/AzureStorageBlobStore.cs b/Framework/Tracking/AzureStorageBlobStore.cs index 1e7814c9f..360601f7a 100644 --- a/Framework/Tracking/AzureStorageBlobStore.cs +++ b/Framework/Tracking/AzureStorageBlobStore.cs @@ -22,12 +22,15 @@ namespace DurableTask.Tracking /// public class AzureStorageBlobStore : IOrchestrationServiceBlobStore { + /// + /// The client to access and manage the blob store + /// readonly BlobStorageClient blobClient; /// /// Creates a new AzureStorageBlobStore using the supplied hub name and connection string /// - /// The hubname for this store + /// The hub name for this store /// Azure storage connection string public AzureStorageBlobStore(string hubName, string connectionString) { @@ -35,24 +38,13 @@ public AzureStorageBlobStore(string hubName, string connectionString) } /// - /// Create a storage key based on the creation date. - /// This key will be used to save and load the stream in external storage when it is too large. - /// - /// The creation date of the blob. Could be DateTime.MinValue if want to use current time. - /// A storage key. - public string BuildStorageKey(DateTime creationDate) - { - return BlobStorageClientHelper.BuildStorageKey(creationDate); - } - - /// - /// Create a storage key based on the orchestrationInstance. + /// Create a blob storage access key based on the orchestrationInstance. /// This key will be used to save and load the stream message in external storage when it is too large. /// /// The orchestration instance. /// The message fire time. /// The created storage key. - public string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime) + public string BuildMessageBlobKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime) { return BlobStorageClientHelper.BuildMessageStorageKey( orchestrationInstance != null ? orchestrationInstance.InstanceId : "null", @@ -61,12 +53,12 @@ public string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance } /// - /// Create a storage key based on message session. + /// Create a blob storage access key based on message session. /// This key will be used to save and load the stream in external storage when it is too large. /// /// The message session Id. /// A storage key. - public string BuildSessionStorageKey(string sessionId) + public string BuildSessionBlobKey(string sessionId) { return BlobStorageClientHelper.BuildSessionStorageKey(sessionId); } @@ -74,22 +66,22 @@ public string BuildSessionStorageKey(string sessionId) /// /// Save the stream of the message or seesion using key. /// - /// The storage key. + /// The blob storage key. /// The stream of the message or session. /// - public async Task SaveStreamAsync(string key, Stream stream) + public async Task SaveStreamAsync(string blobKey, Stream stream) { - await this.blobClient.UploadStreamBlob(key, stream); + await this.blobClient.UploadStreamBlobAsync(blobKey, stream); } /// /// Load the stream of message or seesion from storage using key. /// - /// Teh storage key. + /// The blob storage key. /// The saved stream message or session. - public async Task LoadStreamAsync(string key) + public async Task LoadStreamAsync(string blobKey) { - return await this.blobClient.DownloadStreamAsync(key); + return await this.blobClient.DownloadStreamAsync(blobKey); } /// @@ -97,7 +89,7 @@ public async Task LoadStreamAsync(string key) /// public async Task DeleteStoreAsync() { - await this.blobClient.DeleteAllContainersAsync(); + await this.blobClient.DeleteBlobStoreContainersAsync(); } /// diff --git a/Framework/Tracking/BlobStorageClient.cs b/Framework/Tracking/BlobStorageClient.cs index 273b1e62b..5b198aba7 100644 --- a/Framework/Tracking/BlobStorageClient.cs +++ b/Framework/Tracking/BlobStorageClient.cs @@ -55,9 +55,9 @@ public BlobStorageClient(string hubName, string connectionString) throw new ArgumentException("Invalid hub name", nameof(hubName)); } - blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient(); - blobClient.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(DeltaBackOff, MaxRetries); - blobClient.DefaultRequestOptions.MaximumExecutionTime = MaximumExecutionTime; + this.blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient(); + this.blobClient.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(DeltaBackOff, MaxRetries); + this.blobClient.DefaultRequestOptions.MaximumExecutionTime = MaximumExecutionTime; // save the lower case since it will be used as the prefix of the container name, // which only allows lower case letters @@ -70,12 +70,12 @@ public BlobStorageClient(string hubName, string connectionString) /// The key to uniquely locate and access the blob /// The stream to be uploaded /// - public async Task UploadStreamBlob(string key, Stream stream) + public async Task UploadStreamBlobAsync(string key, Stream stream) { string containerNameSuffix; string blobName; BlobStorageClientHelper.ParseKey(key, out containerNameSuffix, out blobName); - var cloudBlob = await GetCloudBlockBlobReferenceAsync(containerNameSuffix, blobName); + var cloudBlob = await this.GetCloudBlockBlobReferenceAsync(containerNameSuffix, blobName); await cloudBlob.UploadFromStreamAsync(stream); } @@ -100,7 +100,7 @@ public async Task DownloadStreamAsync(string key) async Task GetCloudBlockBlobReferenceAsync(string containerNameSuffix, string blobName) { string containerName = BlobStorageClientHelper.BuildContainerName(hubName, containerNameSuffix); - var cloudBlobContainer = blobClient.GetContainerReference(containerName); + var cloudBlobContainer = this.blobClient.GetContainerReference(containerName); await cloudBlobContainer.CreateIfNotExistsAsync(); return cloudBlobContainer.GetBlockBlobReference(blobName); } @@ -111,7 +111,7 @@ async Task GetCloudBlockBlobReferenceAsync(string containerNameSuffi /// A list of Azure blob containers public IEnumerable ListContainers() { - return blobClient.ListContainers(hubName); + return this.blobClient.ListContainers(this.hubName); } /// @@ -127,12 +127,12 @@ public async Task DeleteExpiredContainersAsync(DateTime thresholdDateTimeUtc) } /// - /// Delete all containers with the hub name as prefix. + /// Delete blob containers with the hub name as prefix. /// /// - public async Task DeleteAllContainersAsync() + public async Task DeleteBlobStoreContainersAsync() { - IEnumerable containers = ListContainers(); + IEnumerable containers = this.ListContainers(); var tasks = containers.ToList().Select(container => container.DeleteIfExistsAsync()); await Task.WhenAll(tasks); } diff --git a/Framework/Tracking/BlobStorageClientHelper.cs b/Framework/Tracking/BlobStorageClientHelper.cs index 3b32e6f24..b1e798f36 100644 --- a/Framework/Tracking/BlobStorageClientHelper.cs +++ b/Framework/Tracking/BlobStorageClientHelper.cs @@ -26,22 +26,15 @@ public class BlobStorageClientHelper static readonly string DateFormat = "yyyyMMdd"; static readonly char ContainerNameDelimiter = '-'; - // the blob storage accesss key is in the format of {DateTime}|{blobName} + /// + /// the blob storage accesss key is in the format of {DateTime}|{blobName} + /// public static readonly char KeyDelimiter = '|'; - // the delimiter shown in the blob name as the file path - public static readonly char BlobNameDelimiter = '/'; - /// - /// Build a storage key using the creation time specified. + /// the delimiter shown in the blob name as the file path /// - /// The specified creation time - /// The constructed storage key. - public static string BuildStorageKey(DateTime blobCreationTime) - { - string id = Guid.NewGuid().ToString("N"); - return $"{BuildContainerNameSuffix("blob", blobCreationTime)}{KeyDelimiter}{id}"; - } + public static readonly char BlobNameDelimiter = '/'; /// /// Build a storage key for the message. diff --git a/Framework/Tracking/IOrchestrationServiceBlobStore.cs b/Framework/Tracking/IOrchestrationServiceBlobStore.cs index 34ca55150..61bde89be 100644 --- a/Framework/Tracking/IOrchestrationServiceBlobStore.cs +++ b/Framework/Tracking/IOrchestrationServiceBlobStore.cs @@ -19,48 +19,42 @@ namespace DurableTask.Tracking /// /// Interface to allow save and load large blobs, such as message and session, as a stream using a storage store. + /// The blob is saved in the store using an access key (e.g., a path to the blob), + /// which can be used to uniquely load the blob back. /// public interface IOrchestrationServiceBlobStore { /// - /// Create a storage key based on the creation date. - /// This key will be used to save and load the stream in external storage when it is too large. - /// - /// The creation date of the blob. Could be DateTime.MinValue if want to use current time. - /// A storage key. - string BuildStorageKey(DateTime creationDate); - - /// - /// Create a storage key based on the orchestrationInstance. + /// Create a blob storage access key based on the orchestrationInstance. /// This key will be used to save and load the stream message in external storage when it is too large. /// /// The orchestration instance. /// The message fire time. Could be DateTime.MinValue. /// A message storage key. - string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime); + string BuildMessageBlobKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime); /// - /// Create a storage key based on message session. + /// Create a blob storage access key based on message session. /// This key will be used to save and load the stream in external storage when it is too large. /// /// The message session Id. /// A storage key. - string BuildSessionStorageKey(string sessionId); + string BuildSessionBlobKey(string sessionId); /// /// Save the stream of the message or seesion using key. /// - /// The storage key. + /// The blob storage key. /// The stream of the message or session. /// - Task SaveStreamAsync(string key, Stream stream); + Task SaveStreamAsync(string blobKey, Stream stream); /// /// Load the stream of message or seesion from storage using key. /// - /// Teh storage key. + /// The blob storage key. /// The saved stream message or session. - Task LoadStreamAsync(string key); + Task LoadStreamAsync(string blobKey); /// /// Deletes the blob store diff --git a/FrameworkUnitTests/BlobStorageClientHelperTest.cs b/FrameworkUnitTests/BlobStorageClientHelperTest.cs index fd12e7b28..d547b4105 100644 --- a/FrameworkUnitTests/BlobStorageClientHelperTest.cs +++ b/FrameworkUnitTests/BlobStorageClientHelperTest.cs @@ -39,19 +39,6 @@ public void IsContainerExpiredTest() Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("invalidContainerName", DateTime.UtcNow)); } - [TestMethod] - public void BuildStorageKeyTest() - { - DateTime messageFireTime = new DateTime(2015, 05, 17); - string key = BlobStorageClientHelper.BuildStorageKey(messageFireTime); - Regex regex = new Regex(@"blob-20150517|\w{32}$"); - Assert.IsTrue(regex.Match(key).Success); - - key = BlobStorageClientHelper.BuildStorageKey(DateTime.MinValue); - regex = new Regex(@"blob-\d{8}|\w{32}$"); - Assert.IsTrue(regex.Match(key).Success); - } - [TestMethod] public void BuildMessageStorageKeyTest() { diff --git a/FrameworkUnitTests/BlobStorageClientTest.cs b/FrameworkUnitTests/BlobStorageClientTest.cs index e4cb3a606..32bf875fd 100644 --- a/FrameworkUnitTests/BlobStorageClientTest.cs +++ b/FrameworkUnitTests/BlobStorageClientTest.cs @@ -52,7 +52,7 @@ public async Task TestStreamBlobCreationAndDeletion() string testContent = "test stream content"; string key = "message-20101003|testBlobName"; Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(testContent)); - await blobStorageClient.UploadStreamBlob(key, stream); + await blobStorageClient.UploadStreamBlobAsync(key, stream); MemoryStream result = await blobStorageClient.DownloadStreamAsync(key) as MemoryStream; string resultString = Encoding.UTF8.GetString(result.ToArray()); @@ -66,12 +66,11 @@ public async Task TestDeleteContainers() string key1 = "message-20150516|a"; string key2 = "message-20150517|b"; string key3 = "message-20150518|c"; - string blobName = "testBlob"; Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(testContent)); - await blobStorageClient.UploadStreamBlob(key1, stream); - await blobStorageClient.UploadStreamBlob(key2, stream); - await blobStorageClient.UploadStreamBlob(key3, stream); + await blobStorageClient.UploadStreamBlobAsync(key1, stream); + await blobStorageClient.UploadStreamBlobAsync(key2, stream); + await blobStorageClient.UploadStreamBlobAsync(key3, stream); DateTime dateTime = new DateTime(2015, 05, 17); await blobStorageClient.DeleteExpiredContainersAsync(dateTime); @@ -84,7 +83,7 @@ public async Task TestDeleteContainers() Assert.IsTrue(sortedList[0].EndsWith("20150517")); Assert.IsTrue(sortedList[1].EndsWith("20150518")); - await blobStorageClient.DeleteAllContainersAsync(); + await blobStorageClient.DeleteBlobStoreContainersAsync(); containers = blobStorageClient.ListContainers().ToList(); Assert.AreEqual(0, containers.Count); } diff --git a/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs b/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs index 6a76a76c3..088d24d9f 100644 --- a/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs +++ b/FrameworkUnitTests/RuntimeStateStreamConverterTest.cs @@ -148,6 +148,7 @@ public async Task LargeRuntimeStateConverterTest() catch (OrchestrationException e) { // expected + Assert.IsTrue(e.Message.Contains("IOrchestrationServiceBlobStore"), "Exception must contain IOrchestrationServiceBlobStore."); } } @@ -175,6 +176,7 @@ public async Task VeryLargeRuntimeStateConverterTest() catch (OrchestrationException e) { // expected + Assert.IsTrue(e.Message.Contains("exceeded"), "Exception must contain exceeded."); } }