Skip to content

Commit

Permalink
Create a setting class for service bus session thresholds.
Browse files Browse the repository at this point in the history
  • Loading branch information
zheg committed Jul 11, 2016
1 parent 3a5b81c commit 79144c0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 34 deletions.
1 change: 1 addition & 0 deletions Framework/DurableTaskFramework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
<Compile Include="RuntimeStateStreamConverter.cs" />
<Compile Include="Settings\JumpStartSettings.cs" />
<Compile Include="Settings\ServiceBusOrchestrationServiceSettings.cs" />
<Compile Include="Settings\ServiceBusSettings.cs" />
<Compile Include="Tracking\AzureTableOrchestrationJumpStartEntity.cs" />
<Compile Include="TrackingWorkItem.cs" />
<Compile Include="Tracking\AzureTableInstanceStore.cs" />
Expand Down
4 changes: 0 additions & 4 deletions Framework/RuntimeStateStreamConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ public static async Task<Stream> OrchestrationRuntimeStateToRawStream(Orchestrat
OrchestrationRuntimeState runtimeState, DataConverter dataConverter, bool shouldCompress, long sessionStreamTerminationThresholdInBytes,
long sessionStreamExternalStorageThresholdInBytes, IBlobStore blobStore, string sessionId)
{
if (sessionStreamExternalStorageThresholdInBytes > 0)
{
throw new ArgumentException($"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {sessionStreamTerminationThresholdInBytes} bytes");
}
string serializedState = dataConverter.Serialize(newOrchestrationRuntimeState);

long originalStreamSize = 0;
Expand Down
10 changes: 4 additions & 6 deletions Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ public class ServiceBusOrchestrationService : IOrchestrationService, IOrchestrat
// as every fetched message also creates a tracking message which counts towards this limit.
const int MaxMessageCount = 80;
const int SessionStreamWarningSizeInBytes = 200 * 1024;
const int SessionStreamExternalStorageThresholdInBytes = 230 * 1024;
const int SessionStreamTerminationThresholdInBytes = 20 * 1024 * 1024;
const int StatusPollingIntervalInSeconds = 2;
const int DuplicateDetectionWindowInHours = 4;

Expand Down Expand Up @@ -570,7 +568,7 @@ public async Task CompleteTaskOrchestrationWorkItemAsync(
TraceHelper.TraceSession(
TraceEventType.Error,
workItem.InstanceId,
$"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {SessionStreamTerminationThresholdInBytes}B");
$"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {Settings.ServiceBusSettings.SessionStreamTerminationThresholdInBytes}B");
}

IBlobStore blobStore = this.InstanceStore as IBlobStore;
Expand Down Expand Up @@ -1318,8 +1316,8 @@ async Task<bool> TrySetSessionState(
RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeState,
runtimeState, DataConverter,
Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState,
SessionStreamTerminationThresholdInBytes,
SessionStreamExternalStorageThresholdInBytes, this.InstanceStore as IBlobStore, session.SessionId);
Settings.ServiceBusSettings.SessionStreamTerminationThresholdInBytes,
Settings.ServiceBusSettings.SessionStreamExternalStorageThresholdInBytes, this.InstanceStore as IBlobStore, session.SessionId);

session.SetState(rawStream);

Expand All @@ -1338,7 +1336,7 @@ async Task<bool> TrySetSessionState(

isSessionSizeThresholdExceeded = true;

string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {SessionStreamTerminationThresholdInBytes} bytes";
string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {Settings.ServiceBusSettings.SessionStreamTerminationThresholdInBytes} bytes";
TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason);

BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason);
Expand Down
4 changes: 4 additions & 0 deletions Framework/Settings/ServiceBusOrchestrationServiceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public ServiceBusOrchestrationServiceSettings()
TaskActivityDispatcherSettings = new TaskActivityDispatcherSettings();
TrackingDispatcherSettings = new TrackingDispatcherSettings();
JumpStartSettings = new JumpStartSettings();
ServiceBusSettings = new ServiceBusSettings();

MessageCompressionSettings = new CompressionSettings
{
Style = CompressionStyle.Never,
Expand Down Expand Up @@ -98,5 +100,7 @@ public ServiceBusOrchestrationServiceSettings()
/// Default is false.
/// </summary>
public CompressionSettings MessageCompressionSettings { get; set; }

public ServiceBusSettings ServiceBusSettings { get; set; }
}
}
37 changes: 37 additions & 0 deletions Framework/Settings/ServiceBusSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Settings
{
/// <summary>
/// Settings to configure the Service Bus.
/// </summary>
public class ServiceBusSettings
{
internal ServiceBusSettings()
{
SessionStreamExternalStorageThresholdInBytes = 230 * 1024;
SessionStreamTerminationThresholdInBytes = 10 * 1024 * 1024;
}

/// <summary>
/// The threshold for external storage of session in stream. Default is 230K.
/// </summary>
public int SessionStreamExternalStorageThresholdInBytes { get; set; }

/// <summary>
/// The threshold of session for orchestration termination. Default is 10M.
/// </summary>
public int SessionStreamTerminationThresholdInBytes { get; set; }
}
}
56 changes: 32 additions & 24 deletions FrameworkUnitTests/DispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// ----------------------------------------------------------------------------------


using System.Text;

namespace FrameworkUnitTests
{
using System;
Expand Down Expand Up @@ -635,24 +637,24 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration))
.AddTaskActivities(typeof (LargeSessionTaskActivity))
.StartAsync();

await SessionExceededLimitSubTestsWithInputSize(185 * 1024);
await SessionExceededLimitSubTestsWithInputSize(200 * 1024);
await SessionExceededLimitSubTestsWithInputSize(300 * 1024);
await SessionExceededLimitSubTestsWithInputSize(500 * 1024);
await SessionExceededLimitSubTestsWithInputSize(1000 * 1024);
await SessionExceededLimitSubTestWithInputSize(100 * 1024);
await SessionExceededLimitSubTestWithInputSize(200 * 1024);
await SessionExceededLimitSubTestWithInputSize(300 * 1024);
await SessionExceededLimitSubTestWithInputSize(500 * 1024);
await SessionExceededLimitSubTestWithInputSize(1000 * 1024);
}

private async Task SessionExceededLimitSubTestsWithInputSize(int inputSize)
async Task SessionExceededLimitSubTestWithInputSize(int inputSize)
{
string input = TestUtils.GenerateRandomString(inputSize);
OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), input);
OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), new Tuple<string, int>(input, 2));

bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 90, true);
await Task.Delay(20000);

OrchestrationState state = await client.GetOrchestrationStateAsync(id);
Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus);
Assert.AreEqual($"a:{input}b:{input}", LargeSessionOrchestration.Result);
Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result);
}

[TestMethod]
Expand All @@ -664,7 +666,7 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration))

string input = "abc";

OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), input);
OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), new Tuple<string, int>(input, 2));

bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 90, true);

Expand All @@ -673,7 +675,7 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration))
OrchestrationState state = await client.GetOrchestrationStateAsync(id);

Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus);
Assert.AreEqual($"a:{input}b:{input}", LargeSessionOrchestration.Result);
Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result);
}

[TestMethod]
Expand All @@ -690,7 +692,7 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration))
.AddTaskActivities(typeof (LargeSessionTaskActivity))
.StartAsync();

OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), input);
OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof (LargeSessionOrchestration), new Tuple<string, int>(input, 2));

bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 90, true);

Expand All @@ -699,19 +701,24 @@ await taskHub.AddTaskOrchestrations(typeof (LargeSessionOrchestration))
OrchestrationState state = await client.GetOrchestrationStateAsync(id);

Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus);
Assert.AreEqual($"a:{input}b:{input}", LargeSessionOrchestration.Result);
Assert.AreEqual($"0:{input}-1:{input}-", LargeSessionOrchestration.Result);
}

[TestMethod]
public async Task SessionExceededTerminationLimitTest()
{
//string input = TestUtils.GenerateRandomString(12 * 1024 * 1024);
string input = "xy";
string input = TestUtils.GenerateRandomString(200 * 1024);

await taskHub.AddTaskOrchestrations(typeof(LargeSessionOrchestration))
.AddTaskActivities(typeof(LargeSessionTaskActivity))
.StartAsync();

OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), input);
ServiceBusOrchestrationService serviceBusOrchestrationService =
taskHub.orchestrationService as ServiceBusOrchestrationService;
serviceBusOrchestrationService.Settings.ServiceBusSettings.SessionStreamTerminationThresholdInBytes
= 1024 * 1024;

OrchestrationInstance id = await client.CreateOrchestrationInstanceAsync(typeof(LargeSessionOrchestration), new Tuple<string, int>(input, 10));
bool isCompleted = await TestHelpers.WaitForInstanceAsync(client, id, 90, true);
await Task.Delay(30000);

Expand All @@ -720,21 +727,22 @@ await taskHub.AddTaskOrchestrations(typeof(LargeSessionOrchestration))
Assert.IsTrue(state.Output.Contains("exceeded"));
}

public class LargeSessionOrchestration : TaskOrchestration<string, string>
public class LargeSessionOrchestration : TaskOrchestration<string, Tuple<string, int>>
{
// HACK: This is just a hack to communicate result of orchestration back to test
public static string Result;

public override async Task<string> RunTask(OrchestrationContext context, string input)
public override async Task<string> RunTask(OrchestrationContext context, Tuple<string, int> input)
{
string output = string.Empty;

string outputA = await context.ScheduleTask<string>(typeof(LargeSessionTaskActivity), $"a:{input}");
string outputB = await context.ScheduleTask<string>(typeof(LargeSessionTaskActivity), $"b:{input}");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < input.Item2; i++)
{
string outputI = await context.ScheduleTask<string>(typeof(LargeSessionTaskActivity), $"{i}:{input.Item1}");
sb.Append($"{outputI}-");
}

output = $"{outputA}{outputB}";
Result = output;
return output;
Result = sb.ToString();
return Result;
}
}

Expand Down

0 comments on commit 79144c0

Please sign in to comment.