Skip to content

Commit

Permalink
Handle message overflow using external blob container storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
zheg committed Jun 22, 2016
1 parent 959c28f commit f9a4552
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 43 deletions.
66 changes: 56 additions & 10 deletions Framework/Common/ServiceBusUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ----------------------------------------------------------------------------------

using System.Collections.Generic;
using DurableTask.Tracking;

namespace DurableTask.Common
{
Expand All @@ -28,14 +29,15 @@ internal static class ServiceBusUtils
{
public static BrokeredMessage GetBrokeredMessageFromObject(object serializableObject, CompressionSettings compressionSettings)
{
return GetBrokeredMessageFromObject(serializableObject, compressionSettings, null, null);
return GetBrokeredMessageFromObject(serializableObject, compressionSettings, null, null, null);
}

public static BrokeredMessage GetBrokeredMessageFromObject(
object serializableObject,
CompressionSettings compressionSettings,
OrchestrationInstance instance,
string messageType)
string messageType,
IServiceBusMessageStore serviceBusMessageStore)
{
if (serializableObject == null)
{
Expand All @@ -61,16 +63,36 @@ public static BrokeredMessage GetBrokeredMessageFromObject(
rawStream.Length > compressionSettings.ThresholdInBytes))
{
Stream compressedStream = Utils.GetCompressedStream(rawStream);

brokeredMessage = new BrokeredMessage(compressedStream, true);
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] =
FrameworkConstants.CompressionTypeGzipPropertyValue;

var rawLen = rawStream.Length;
TraceHelper.TraceInstance(TraceEventType.Information, instance,
() =>
"Compression stats for " + (messageType ?? string.Empty) + " : " + brokeredMessage.MessageId +
"Compression stats for " + (messageType ?? string.Empty) + " : " +
brokeredMessage.MessageId +
", uncompressed " + rawLen + " -> compressed " + compressedStream.Length);

if (compressedStream.Length < FrameworkConstants.MaxMessageSizeInBytes)
{
brokeredMessage = new BrokeredMessage(compressedStream, true);
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] =
FrameworkConstants.CompressionTypeGzipPropertyValue;
}
else if (serviceBusMessageStore != null)
{
// save the compressed stream using external storage when it is larger
// than the supported message size limit.
// the message is stored using the generated key, which is saved in the message property.
string storageKey = serviceBusMessageStore.BuildMessageStorageKey(instance);
serviceBusMessageStore.SaveSteamMessageWithKey(storageKey, compressedStream);
brokeredMessage = new BrokeredMessage();
brokeredMessage.Properties[FrameworkConstants.MessageStorageKey] = storageKey;
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] =
FrameworkConstants.CompressionTypeGzipPropertyValue;
}
else
{
throw new ArgumentException($"The compressed message is larger than supported. " +
$"Please provide an implementation of IServiceBusMessageStore for external storage.", nameof(IServiceBusMessageStore));
}
}
else
{
Expand All @@ -95,7 +117,7 @@ public static BrokeredMessage GetBrokeredMessageFromObject(
}
}

public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage message)
public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage message, IServiceBusMessageStore serviceBusMessageStore)
{
if (message == null)
{
Expand All @@ -120,7 +142,7 @@ public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage
else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeGzipPropertyValue,
StringComparison.OrdinalIgnoreCase))
{
using (var compressedStream = message.GetBody<Stream>())
using (var compressedStream = await GetCompressedStream(message, serviceBusMessageStore))
{
if (!Utils.IsGzipStream(compressedStream))
{
Expand Down Expand Up @@ -153,6 +175,30 @@ public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(BrokeredMessage
return deserializedObject;
}

static async Task<Stream> GetCompressedStream(BrokeredMessage message, IServiceBusMessageStore serviceBusMessageStore)
{
object storageKeyObj = null;
string storageKey = string.Empty;

if (message.Properties.TryGetValue(FrameworkConstants.MessageStorageKey, out storageKeyObj))
{
storageKey = (string)storageKeyObj;
}
if (string.IsNullOrEmpty(storageKey))
{
return message.GetBody<Stream>();
}

// if the storage key is set in the message property,
// load the stream message from the service bus message store.
if (serviceBusMessageStore == null)
{
throw new ArgumentException($"Failed to load compressed message from external storage with key: {storageKey}. Please provide an implementation of IServiceBusMessageStore for external storage.", nameof(IServiceBusMessageStore));
}
return await serviceBusMessageStore.LoadSteamMessageWithKey(storageKey);

}

public static void CheckAndLogDeliveryCount(string sessionId, IEnumerable<BrokeredMessage> messages, int maxDeliverycount)
{
foreach (BrokeredMessage message in messages)
Expand Down
3 changes: 3 additions & 0 deletions Framework/DurableTaskFramework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@
<Compile Include="Tracking\AzureTableOrchestrationJumpStartEntity.cs" />
<Compile Include="TrackingWorkItem.cs" />
<Compile Include="Tracking\AzureTableInstanceStore.cs" />
<Compile Include="Tracking\BlobStorageClient.cs" />
<Compile Include="Tracking\BlobStorageClientHelper.cs" />
<Compile Include="Tracking\IServiceBusMessageStore.cs" />
<Compile Include="Tracking\JumpStartManager.cs" />
<Compile Include="Tracking\InstanceEntityBase.cs" />
<Compile Include="Tracking\OrchestrationJumpStartInstanceEntity.cs" />
Expand Down
6 changes: 6 additions & 0 deletions Framework/FrameworkConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ internal class FrameworkConstants
public const string CompressionTypeGzipPropertyValue = "gzip";
public const string CompressionTypeNonePropertyValue = "none";

// service bus message storage key in message property
public const string MessageStorageKey = "messageStorageKey";

// the max allowed message size after compression in service bus
public const int MaxMessageSizeInBytes = 230 * 1024;

// instance store constants
public const int MaxStringLengthForAzureTableColumn = 1024 * 15; // cut off at 15k * 2 bytes
}
Expand Down
46 changes: 30 additions & 16 deletions Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,9 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
string.Join(",", newMessages.Select(m => m.MessageId))}"));

ServiceBusUtils.CheckAndLogDeliveryCount(session.SessionId, newMessages, this.Settings.MaxTaskOrchestrationDeliveryCount);

IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
IList<TaskMessage> newTaskMessages = await Task.WhenAll(
newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message)));
newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message, serviceBusMessageStore)));

OrchestrationRuntimeState runtimeState = await GetSessionState(session);

Expand Down Expand Up @@ -572,6 +572,7 @@ public async Task CompleteTaskOrchestrationWorkItemAsync(
$"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {SessionStreamTerminationThresholdInBytes}B");
}

IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
// We need to .ToList() the IEnumerable otherwise GetBrokeredMessageFromObject gets called 5 times per message due to Service Bus doing multiple enumeration
if (outboundMessages?.Count > 0)
{
Expand All @@ -581,7 +582,8 @@ await workerSender.SendBatchAsync(
m,
Settings.MessageCompressionSettings,
null,
"Worker outbound message"))
"Worker outbound message",
serviceBusMessageStore))
.ToList()
);
}
Expand All @@ -595,7 +597,8 @@ await orchestratorQueueClient.SendBatchAsync(
m,
Settings.MessageCompressionSettings,
newOrchestrationRuntimeState.OrchestrationInstance,
"Timer Message");
"Timer Message",
serviceBusMessageStore);
message.ScheduledEnqueueTimeUtc = ((TimerFiredEvent)m.Event).FireAt;
return message;
})
Expand All @@ -611,7 +614,8 @@ await orchestratorQueueClient.SendBatchAsync(
m,
Settings.MessageCompressionSettings,
m.OrchestrationInstance,
"Sub Orchestration"))
"Sub Orchestration",
serviceBusMessageStore))
.ToList()
);
}
Expand All @@ -623,7 +627,8 @@ await orchestratorQueueClient.SendAsync(
continuedAsNewMessage,
Settings.MessageCompressionSettings,
newOrchestrationRuntimeState.OrchestrationInstance,
"Continue as new")
"Continue as new",
serviceBusMessageStore)
);
}

Expand Down Expand Up @@ -724,7 +729,9 @@ public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan re
receivedMessage.SessionId,
GetFormattedLog($"New message to process: {receivedMessage.MessageId} [{receivedMessage.SequenceNumber}]"));

TaskMessage taskMessage = await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(receivedMessage);
IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
TaskMessage taskMessage = await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(receivedMessage,
serviceBusMessageStore);

ServiceBusUtils.CheckAndLogDeliveryCount(receivedMessage, Settings.MaxTaskActivityDeliveryCount);

Expand Down Expand Up @@ -790,11 +797,13 @@ public async Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskA
/// <param name="responseMessage">The response message to send</param>
public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
{
IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
BrokeredMessage brokeredResponseMessage = ServiceBusUtils.GetBrokeredMessageFromObject(
responseMessage,
Settings.MessageCompressionSettings,
workItem.TaskMessage.OrchestrationInstance,
$"Response for {workItem.TaskMessage.OrchestrationInstance.InstanceId}");
$"Response for {workItem.TaskMessage.OrchestrationInstance.InstanceId}",
serviceBusMessageStore);

var originalMessage = GetAndDeleteBrokeredMessageForWorkItem(workItem);
if (originalMessage == null)
Expand Down Expand Up @@ -907,11 +916,13 @@ public async Task UpdateJumpStartStoreAsync(TaskMessage creationMessage)
/// <param name="message">The task message to be sent for the orchestration</param>
public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
{
IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
BrokeredMessage brokeredMessage = ServiceBusUtils.GetBrokeredMessageFromObject(
message,
Settings.MessageCompressionSettings,
message.OrchestrationInstance,
"SendTaskOrchestrationMessage");
"SendTaskOrchestrationMessage",
serviceBusMessageStore);

// Use duplicate detection of ExecutionStartedEvent by addin messageId
var executionStartedEvent = message.Event as ExecutionStartedEvent;
Expand Down Expand Up @@ -1072,9 +1083,9 @@ async Task<TrackingWorkItem> FetchTrackingWorkItemAsync(TimeSpan receiveTimeout)
GetFormattedLog($"{newMessages.Count()} new tracking messages to process: {string.Join(",", newMessages.Select(m => m.MessageId))}"));

ServiceBusUtils.CheckAndLogDeliveryCount(newMessages, Settings.MaxTrackingDeliveryCount);

IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
IList<TaskMessage> newTaskMessages = await Task.WhenAll(
newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message)));
newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message, serviceBusMessageStore)));

var lockTokens = newMessages.ToDictionary(m => m.LockToken, m => m);
var sessionState = new ServiceBusOrchestrationSession
Expand Down Expand Up @@ -1107,7 +1118,7 @@ List<BrokeredMessage> CreateTrackingMessages(OrchestrationRuntimeState runtimeSt
{
return trackingMessages;
}

IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
// this is to stamp the tracking events with a sequence number so they can be ordered even if
// writing to a store like azure table
int historyEventIndex = runtimeState.Events.Count - runtimeState.NewEvents.Count;
Expand All @@ -1124,7 +1135,8 @@ List<BrokeredMessage> CreateTrackingMessages(OrchestrationRuntimeState runtimeSt
taskMessage,
Settings.MessageCompressionSettings,
runtimeState.OrchestrationInstance,
"History Tracking Message");
"History Tracking Message",
serviceBusMessageStore);
trackingMessages.Add(trackingMessage);
}

Expand All @@ -1139,7 +1151,8 @@ List<BrokeredMessage> CreateTrackingMessages(OrchestrationRuntimeState runtimeSt
stateMessage,
Settings.MessageCompressionSettings,
runtimeState.OrchestrationInstance,
"State Tracking Message");
"State Tracking Message",
serviceBusMessageStore);
trackingMessages.Add(brokeredStateMessage);

return trackingMessages;
Expand Down Expand Up @@ -1420,12 +1433,13 @@ BrokeredMessage CreateForcedTerminateMessage(string instanceId, string reason)
OrchestrationInstance = newOrchestrationInstance,
Event = new ExecutionTerminatedEvent(-1, reason)
};

IServiceBusMessageStore serviceBusMessageStore = this.InstanceStore as IServiceBusMessageStore;
BrokeredMessage message = ServiceBusUtils.GetBrokeredMessageFromObject(
taskMessage,
Settings.MessageCompressionSettings,
newOrchestrationInstance,
"Forced Terminate");
"Forced Terminate",
serviceBusMessageStore);

return message;
}
Expand Down
Loading

0 comments on commit f9a4552

Please sign in to comment.