Skip to content

Commit

Permalink
Address CR.
Browse files Browse the repository at this point in the history
  • Loading branch information
zheg committed Aug 2, 2016
1 parent 210ec23 commit a0ca35b
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 88 deletions.
2 changes: 1 addition & 1 deletion Framework/Common/ServiceBusUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ static async Task<BrokeredMessage> GenerateBrokeredMessageWithStorageKeyProperty
// save the compressed stream using external storage when it is larger
// than the supported message size limit.
// the stream is stored using the generated key, which is saved in the message property.
string storageKey = orchestrationServiceBlobStore.BuildMessageStorageKey(instance, messageFireTime);
string storageKey = orchestrationServiceBlobStore.BuildMessageBlobKey(instance, messageFireTime);

TraceHelper.TraceInstance(
TraceEventType.Information,
Expand Down
12 changes: 6 additions & 6 deletions Framework/Serializing/RuntimeStateStreamConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DurableTask.Serializing
/// <summary>
/// A converter that does conversion between the OrchestrationRuntimeState instance and a stream after serialization.
/// The stream is a serialized OrchestrationSessionState that will set as session state.
/// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList<HistoryEvent>.
/// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList of HistoryEvent.
/// </summary>
class RuntimeStateStreamConverter
{
Expand Down Expand Up @@ -96,7 +96,7 @@ async static Task<Stream> CreateStreamForExternalStorageAsync(
}

// create a new orchestration session state with the external storage key
string key = orchestrationServiceBlobStore.BuildSessionStorageKey(sessionId);
string key = orchestrationServiceBlobStore.BuildSessionBlobKey(sessionId);
OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(key);

string serializedStateExternal = dataConverter.Serialize(orchestrationSessionState);
Expand Down Expand Up @@ -197,11 +197,11 @@ static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, st
/// Deserialize the session state to construct an OrchestrationRuntimeState instance.
///
/// The session state string could be one of these:
/// 1. a serialized IList<HistoryEvent> (master branch implementation), or
/// 1. a serialized IList of HistoryEvent (master branch implementation), or
/// 2. a serialized OrchestrationRuntimeState instance with the history event list (vnext branch implementation), or
/// 3. a serialized OrchestrationSessionState instance with the history event list or a storage key (latest implementation).
///
/// So when doing the deserialization, it is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList<HistoryEvent>, to cover all cases.
/// So when doing the deserialization, it is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList of HistoryEvent, to cover all cases.
///
/// </summary>
/// <param name="serializedState">The serialized session state</param>
Expand All @@ -225,7 +225,7 @@ static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string se
TraceHelper.TraceSession(
TraceEventType.Warning,
sessionId,
$"Failed to deserialize session state to OrchestrationSessionState object: {serializedState}");
$"Failed to deserialize session state to OrchestrationSessionState object: {serializedState}. More info: {exception.StackTrace}");
try
{
OrchestrationRuntimeState restoredState =
Expand All @@ -238,7 +238,7 @@ static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string se
TraceHelper.TraceSession(
TraceEventType.Warning,
sessionId,
$"Failed to deserialize session state to OrchestrationRuntimeState object: {serializedState}");
$"Failed to deserialize session state to OrchestrationRuntimeState object: {serializedState}. More info: {e.StackTrace}");

IList<HistoryEvent> events = dataConverter.Deserialize<IList<HistoryEvent>>(serializedState);
runtimeState = new OrchestrationRuntimeState(events);
Expand Down
2 changes: 1 addition & 1 deletion Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ async Task<bool> TrySetSessionState(

isSessionSizeThresholdExceeded = true;

string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {Settings.SessionSettings.SessionMaxSizeInBytes} bytes";
string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {Settings.SessionSettings.SessionMaxSizeInBytes} bytes. More info: {exception.StackTrace}";
TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason);

BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason);
Expand Down
38 changes: 15 additions & 23 deletions Framework/Tracking/AzureStorageBlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,29 @@ namespace DurableTask.Tracking
/// </summary>
public class AzureStorageBlobStore : IOrchestrationServiceBlobStore
{
/// <summary>
/// The client to access and manage the blob store
/// </summary>
readonly BlobStorageClient blobClient;

/// <summary>
/// Creates a new AzureStorageBlobStore using the supplied hub name and connection string
/// </summary>
/// <param name="hubName">The hubname for this store</param>
/// <param name="hubName">The hub name for this store</param>
/// <param name="connectionString">Azure storage connection string</param>
public AzureStorageBlobStore(string hubName, string connectionString)
{
this.blobClient = new BlobStorageClient(hubName, connectionString);
}

/// <summary>
/// Create a storage key based on the creation date.
/// This key will be used to save and load the stream in external storage when it is too large.
/// </summary>
/// <param name="creationDate">The creation date of the blob. Could be DateTime.MinValue if want to use current time.</param>
/// <returns>A storage key.</returns>
public string BuildStorageKey(DateTime creationDate)
{
return BlobStorageClientHelper.BuildStorageKey(creationDate);
}

/// <summary>
/// Create a storage key based on the orchestrationInstance.
/// Create a blob storage access key based on the orchestrationInstance.
/// This key will be used to save and load the stream message in external storage when it is too large.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance.</param>
/// <param name="messageFireTime">The message fire time.</param>
/// <returns>The created storage key.</returns>
public string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime)
public string BuildMessageBlobKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime)
{
return BlobStorageClientHelper.BuildMessageStorageKey(
orchestrationInstance != null ? orchestrationInstance.InstanceId : "null",
Expand All @@ -61,43 +53,43 @@ public string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance
}

/// <summary>
/// Create a storage key based on message session.
/// Create a blob storage access key based on message session.
/// This key will be used to save and load the stream in external storage when it is too large.
/// </summary>
/// <param name="sessionId">The message session Id.</param>
/// <returns>A storage key.</returns>
public string BuildSessionStorageKey(string sessionId)
public string BuildSessionBlobKey(string sessionId)
{
return BlobStorageClientHelper.BuildSessionStorageKey(sessionId);
}

/// <summary>
/// Save the stream of the message or seesion using key.
/// </summary>
/// <param name="key">The storage key.</param>
/// <param name="blobKey">The blob storage key.</param>
/// <param name="stream">The stream of the message or session.</param>
/// <returns></returns>
public async Task SaveStreamAsync(string key, Stream stream)
public async Task SaveStreamAsync(string blobKey, Stream stream)
{
await this.blobClient.UploadStreamBlob(key, stream);
await this.blobClient.UploadStreamBlobAsync(blobKey, stream);
}

/// <summary>
/// Load the stream of message or seesion from storage using key.
/// </summary>
/// <param name="key">Teh storage key.</param>
/// <param name="blobKey">The blob storage key.</param>
/// <returns>The saved stream message or session.</returns>
public async Task<Stream> LoadStreamAsync(string key)
public async Task<Stream> LoadStreamAsync(string blobKey)
{
return await this.blobClient.DownloadStreamAsync(key);
return await this.blobClient.DownloadStreamAsync(blobKey);
}

/// <summary>
/// Deletes the Azure blob storage
/// </summary>
public async Task DeleteStoreAsync()
{
await this.blobClient.DeleteAllContainersAsync();
await this.blobClient.DeleteBlobStoreContainersAsync();
}

/// <summary>
Expand Down
20 changes: 10 additions & 10 deletions Framework/Tracking/BlobStorageClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public BlobStorageClient(string hubName, string connectionString)
throw new ArgumentException("Invalid hub name", nameof(hubName));
}

blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient();
blobClient.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(DeltaBackOff, MaxRetries);
blobClient.DefaultRequestOptions.MaximumExecutionTime = MaximumExecutionTime;
this.blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient();
this.blobClient.DefaultRequestOptions.RetryPolicy = new ExponentialRetry(DeltaBackOff, MaxRetries);
this.blobClient.DefaultRequestOptions.MaximumExecutionTime = MaximumExecutionTime;

// save the lower case since it will be used as the prefix of the container name,
// which only allows lower case letters
Expand All @@ -70,12 +70,12 @@ public BlobStorageClient(string hubName, string connectionString)
/// <param name="key">The key to uniquely locate and access the blob</param>
/// <param name="stream">The stream to be uploaded</param>
/// <returns></returns>
public async Task UploadStreamBlob(string key, Stream stream)
public async Task UploadStreamBlobAsync(string key, Stream stream)
{
string containerNameSuffix;
string blobName;
BlobStorageClientHelper.ParseKey(key, out containerNameSuffix, out blobName);
var cloudBlob = await GetCloudBlockBlobReferenceAsync(containerNameSuffix, blobName);
var cloudBlob = await this.GetCloudBlockBlobReferenceAsync(containerNameSuffix, blobName);
await cloudBlob.UploadFromStreamAsync(stream);
}

Expand All @@ -100,7 +100,7 @@ public async Task<Stream> DownloadStreamAsync(string key)
async Task<ICloudBlob> GetCloudBlockBlobReferenceAsync(string containerNameSuffix, string blobName)
{
string containerName = BlobStorageClientHelper.BuildContainerName(hubName, containerNameSuffix);
var cloudBlobContainer = blobClient.GetContainerReference(containerName);
var cloudBlobContainer = this.blobClient.GetContainerReference(containerName);
await cloudBlobContainer.CreateIfNotExistsAsync();
return cloudBlobContainer.GetBlockBlobReference(blobName);
}
Expand All @@ -111,7 +111,7 @@ async Task<ICloudBlob> GetCloudBlockBlobReferenceAsync(string containerNameSuffi
/// <returns>A list of Azure blob containers</returns>
public IEnumerable<CloudBlobContainer> ListContainers()
{
return blobClient.ListContainers(hubName);
return this.blobClient.ListContainers(this.hubName);
}

/// <summary>
Expand All @@ -127,12 +127,12 @@ public async Task DeleteExpiredContainersAsync(DateTime thresholdDateTimeUtc)
}

/// <summary>
/// Delete all containers with the hub name as prefix.
/// Delete blob containers with the hub name as prefix.
/// </summary>
/// <returns></returns>
public async Task DeleteAllContainersAsync()
public async Task DeleteBlobStoreContainersAsync()
{
IEnumerable<CloudBlobContainer> containers = ListContainers();
IEnumerable<CloudBlobContainer> containers = this.ListContainers();
var tasks = containers.ToList().Select(container => container.DeleteIfExistsAsync());
await Task.WhenAll(tasks);
}
Expand Down
17 changes: 5 additions & 12 deletions Framework/Tracking/BlobStorageClientHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,15 @@ public class BlobStorageClientHelper
static readonly string DateFormat = "yyyyMMdd";
static readonly char ContainerNameDelimiter = '-';

// the blob storage accesss key is in the format of {DateTime}|{blobName}
/// <summary>
/// the blob storage accesss key is in the format of {DateTime}|{blobName}
/// </summary>
public static readonly char KeyDelimiter = '|';

// the delimiter shown in the blob name as the file path
public static readonly char BlobNameDelimiter = '/';

/// <summary>
/// Build a storage key using the creation time specified.
/// the delimiter shown in the blob name as the file path
/// </summary>
/// <param name="blobCreationTime">The specified creation time</param>
/// <returns>The constructed storage key.</returns>
public static string BuildStorageKey(DateTime blobCreationTime)
{
string id = Guid.NewGuid().ToString("N");
return $"{BuildContainerNameSuffix("blob", blobCreationTime)}{KeyDelimiter}{id}";
}
public static readonly char BlobNameDelimiter = '/';

/// <summary>
/// Build a storage key for the message.
Expand Down
26 changes: 10 additions & 16 deletions Framework/Tracking/IOrchestrationServiceBlobStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,42 @@ namespace DurableTask.Tracking

/// <summary>
/// Interface to allow save and load large blobs, such as message and session, as a stream using a storage store.
/// The blob is saved in the store using an access key (e.g., a path to the blob),
/// which can be used to uniquely load the blob back.
/// </summary>
public interface IOrchestrationServiceBlobStore
{
/// <summary>
/// Create a storage key based on the creation date.
/// This key will be used to save and load the stream in external storage when it is too large.
/// </summary>
/// <param name="creationDate">The creation date of the blob. Could be DateTime.MinValue if want to use current time.</param>
/// <returns>A storage key.</returns>
string BuildStorageKey(DateTime creationDate);

/// <summary>
/// Create a storage key based on the orchestrationInstance.
/// Create a blob storage access key based on the orchestrationInstance.
/// This key will be used to save and load the stream message in external storage when it is too large.
/// </summary>
/// <param name="orchestrationInstance">The orchestration instance.</param>
/// <param name="messageFireTime">The message fire time. Could be DateTime.MinValue.</param>
/// <returns>A message storage key.</returns>
string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime);
string BuildMessageBlobKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime);

/// <summary>
/// Create a storage key based on message session.
/// Create a blob storage access key based on message session.
/// This key will be used to save and load the stream in external storage when it is too large.
/// </summary>
/// <param name="sessionId">The message session Id.</param>
/// <returns>A storage key.</returns>
string BuildSessionStorageKey(string sessionId);
string BuildSessionBlobKey(string sessionId);

/// <summary>
/// Save the stream of the message or seesion using key.
/// </summary>
/// <param name="key">The storage key.</param>
/// <param name="blobKey">The blob storage key.</param>
/// <param name="stream">The stream of the message or session.</param>
/// <returns></returns>
Task SaveStreamAsync(string key, Stream stream);
Task SaveStreamAsync(string blobKey, Stream stream);

/// <summary>
/// Load the stream of message or seesion from storage using key.
/// </summary>
/// <param name="key">Teh storage key.</param>
/// <param name="blobKey">The blob storage key.</param>
/// <returns>The saved stream message or session.</returns>
Task<Stream> LoadStreamAsync(string key);
Task<Stream> LoadStreamAsync(string blobKey);

/// <summary>
/// Deletes the blob store
Expand Down
13 changes: 0 additions & 13 deletions FrameworkUnitTests/BlobStorageClientHelperTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,6 @@ public void IsContainerExpiredTest()
Assert.IsFalse(BlobStorageClientHelper.IsContainerExpired("invalidContainerName", DateTime.UtcNow));
}

[TestMethod]
public void BuildStorageKeyTest()
{
DateTime messageFireTime = new DateTime(2015, 05, 17);
string key = BlobStorageClientHelper.BuildStorageKey(messageFireTime);
Regex regex = new Regex(@"blob-20150517|\w{32}$");
Assert.IsTrue(regex.Match(key).Success);

key = BlobStorageClientHelper.BuildStorageKey(DateTime.MinValue);
regex = new Regex(@"blob-\d{8}|\w{32}$");
Assert.IsTrue(regex.Match(key).Success);
}

[TestMethod]
public void BuildMessageStorageKeyTest()
{
Expand Down
Loading

0 comments on commit a0ca35b

Please sign in to comment.