Skip to content

Commit

Permalink
Settings for service bus message.
Browse files Browse the repository at this point in the history
  • Loading branch information
zheg committed Jul 15, 2016
1 parent 590bf0f commit 3100c66
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 10 deletions.
7 changes: 4 additions & 3 deletions Framework/Common/ServiceBusUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ internal static class ServiceBusUtils
{
public static async Task<BrokeredMessage> GetBrokeredMessageFromObjectAsync(object serializableObject, CompressionSettings compressionSettings)
{
return await GetBrokeredMessageFromObjectAsync(serializableObject, compressionSettings, null, null, null, DateTime.MinValue);
return await GetBrokeredMessageFromObjectAsync(serializableObject, compressionSettings, new ServiceBusMessageSettings(), null, null, null, DateTime.MinValue);
}

public static async Task<BrokeredMessage> GetBrokeredMessageFromObjectAsync(
object serializableObject,
CompressionSettings compressionSettings,
ServiceBusMessageSettings messageSettings,
OrchestrationInstance instance,
string messageType,
IBlobStore blobStore,
Expand Down Expand Up @@ -72,13 +73,13 @@ public static async Task<BrokeredMessage> GetBrokeredMessageFromObjectAsync(
brokeredMessage.MessageId +
", uncompressed " + rawLen + " -> compressed " + compressedStream.Length);

if (compressedStream.Length < FrameworkConstants.MaxMessageSizeInBytes)
if (compressedStream.Length < messageSettings.MaxMessageSizeInBytes)
{
brokeredMessage = new BrokeredMessage(compressedStream, true);
brokeredMessage.Properties[FrameworkConstants.CompressionTypePropertyName] =
FrameworkConstants.CompressionTypeGzipPropertyValue;
}
else if (compressedStream.Length < FrameworkConstants.MaxMessageSizeForBlobInBytes && blobStore != null)
else if (compressedStream.Length < messageSettings.MaxMessageSizeForBlobInBytes && blobStore != null)
{
// save the compressed stream using external storage when it is larger
// than the supported message size limit.
Expand Down
1 change: 1 addition & 0 deletions Framework/DurableTaskFramework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
<Compile Include="OrchestrationSessionState.cs" />
<Compile Include="RuntimeStateStreamConverter.cs" />
<Compile Include="Settings\JumpStartSettings.cs" />
<Compile Include="Settings\ServiceBusMessageSettings.cs" />
<Compile Include="Settings\ServiceBusOrchestrationServiceSettings.cs" />
<Compile Include="Settings\ServiceBusSessionSettings.cs" />
<Compile Include="Tracking\AzureTableOrchestrationJumpStartEntity.cs" />
Expand Down
6 changes: 0 additions & 6 deletions Framework/FrameworkConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ internal class FrameworkConstants
// service bus message storage key in message property
public const string MessageStorageKey = "MessageStorageKey";

// the max allowed message size after compression in service bus
public const int MaxMessageSizeInBytes = 170 * 1024;

// the max allowed message size for external storage
public const int MaxMessageSizeForBlobInBytes = 10 * 1024 * 1024;

// instance store constants
public const int MaxStringLengthForAzureTableColumn = 1024 * 15; // cut off at 15k * 2 bytes
}
Expand Down
9 changes: 9 additions & 0 deletions Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ await Task.WhenAll(outboundMessages.Select(async m =>
await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
m,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
null,
"Worker outbound message",
blobStore,
Expand All @@ -597,6 +598,7 @@ await Task.WhenAll(timerMessages.Select(async m =>
BrokeredMessage message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
m,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
newOrchestrationRuntimeState.OrchestrationInstance,
"Timer Message",
blobStore,
Expand All @@ -615,6 +617,7 @@ await Task.WhenAll(orchestratorMessages.Select(async m =>
await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
m,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
m.OrchestrationInstance,
"Sub Orchestration",
blobStore,
Expand All @@ -629,6 +632,7 @@ await orchestratorQueueClient.SendAsync(
await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
continuedAsNewMessage,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
newOrchestrationRuntimeState.OrchestrationInstance,
"Continue as new",
blobStore,
Expand Down Expand Up @@ -805,6 +809,7 @@ public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workIte
BrokeredMessage brokeredResponseMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
responseMessage,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
workItem.TaskMessage.OrchestrationInstance,
$"Response for {workItem.TaskMessage.OrchestrationInstance.InstanceId}",
blobStore,
Expand Down Expand Up @@ -925,6 +930,7 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
BrokeredMessage brokeredMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
message,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
message.OrchestrationInstance,
"SendTaskOrchestrationMessage",
blobStore,
Expand Down Expand Up @@ -1140,6 +1146,7 @@ async Task<List<BrokeredMessage>> CreateTrackingMessagesAsync(OrchestrationRunti
BrokeredMessage trackingMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
taskMessage,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
runtimeState.OrchestrationInstance,
"History Tracking Message",
blobStore,
Expand All @@ -1157,6 +1164,7 @@ async Task<List<BrokeredMessage>> CreateTrackingMessagesAsync(OrchestrationRunti
BrokeredMessage brokeredStateMessage = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
stateMessage,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
runtimeState.OrchestrationInstance,
"State Tracking Message",
blobStore,
Expand Down Expand Up @@ -1424,6 +1432,7 @@ async Task<BrokeredMessage> CreateForcedTerminateMessageAsync(string instanceId,
BrokeredMessage message = await ServiceBusUtils.GetBrokeredMessageFromObjectAsync(
taskMessage,
Settings.MessageCompressionSettings,
Settings.ServiceBusMessageSettings,
newOrchestrationInstance,
"Forced Terminate",
blobStore,
Expand Down
42 changes: 42 additions & 0 deletions Framework/Settings/ServiceBusMessageSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Settings
{
/// <summary>
/// Settings to configure the Service Bus message.
/// </summary>
public class ServiceBusMessageSettings
{
internal ServiceBusMessageSettings()
{
MaxMessageSizeInBytes = 170 * 1024;
MaxMessageSizeForBlobInBytes = 10 * 1024 * 1024;
}
internal ServiceBusMessageSettings(int maxMessageSizeInBytes, int maxMessageSizeForBlobInBytes)
{
MaxMessageSizeInBytes = maxMessageSizeInBytes;
MaxMessageSizeForBlobInBytes = maxMessageSizeForBlobInBytes;
}

/// <summary>
/// The max allowed message size after compression in service bus. Default is 170K.
/// </summary>
public int MaxMessageSizeInBytes { get; set; }

/// <summary>
/// The max allowed message size for external storage. Default is 10M.
/// </summary>
public int MaxMessageSizeForBlobInBytes { get; set; }
}
}
9 changes: 9 additions & 0 deletions Framework/Settings/ServiceBusOrchestrationServiceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public ServiceBusOrchestrationServiceSettings()
TrackingDispatcherSettings = new TrackingDispatcherSettings();
JumpStartSettings = new JumpStartSettings();
ServiceBusSessionSettings = new ServiceBusSessionSettings();
ServiceBusMessageSettings = new ServiceBusMessageSettings();

MessageCompressionSettings = new CompressionSettings
{
Expand Down Expand Up @@ -101,6 +102,14 @@ public ServiceBusOrchestrationServiceSettings()
/// </summary>
public CompressionSettings MessageCompressionSettings { get; set; }

/// <summary>
/// Settings to configure the service bus session
/// </summary>
public ServiceBusSessionSettings ServiceBusSessionSettings { get; set; }

/// <summary>
/// Settings to configure the service bus message
/// </summary>
public ServiceBusMessageSettings ServiceBusMessageSettings { get; set; }
}
}
2 changes: 1 addition & 1 deletion Framework/Settings/ServiceBusSessionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace DurableTask.Settings
{
/// <summary>
/// Settings to configure the Service Bus.
/// Settings to configure the Service Bus session.
/// </summary>
public class ServiceBusSessionSettings
{
Expand Down

0 comments on commit 3100c66

Please sign in to comment.