From fb1029cbec7c4108ff8fc716ff9c9b1137a89d02 Mon Sep 17 00:00:00 2001 From: Zhenyu Guo Date: Thu, 7 Jul 2016 09:20:07 -0700 Subject: [PATCH] Handle seesion overflow using external storage. --- Framework/DurableTaskFramework.csproj | 1 + Framework/OrchestrationRuntimeState.cs | 8 + Framework/RuntimeStateStreamConverter.cs | 149 +++++++++++++++++ Framework/ServiceBusOrchestrationService.cs | 113 ++++--------- Framework/Tracking/AzureTableInstanceStore.cs | 13 ++ Framework/Tracking/BlobStorageClientHelper.cs | 9 + Framework/Tracking/IBlobStore.cs | 10 ++ .../BlobStorageClientHelperTest.cs | 22 +++ .../DurableTaskFrameworkUnitTests.csproj | 1 + .../RumtimeStateStreamConverterTest.cs | 155 ++++++++++++++++++ 10 files changed, 399 insertions(+), 82 deletions(-) create mode 100644 Framework/RuntimeStateStreamConverter.cs create mode 100644 FrameworkUnitTests/RumtimeStateStreamConverterTest.cs diff --git a/Framework/DurableTaskFramework.csproj b/Framework/DurableTaskFramework.csproj index 338bd6648..c70642f00 100644 --- a/Framework/DurableTaskFramework.csproj +++ b/Framework/DurableTaskFramework.csproj @@ -107,6 +107,7 @@ + diff --git a/Framework/OrchestrationRuntimeState.cs b/Framework/OrchestrationRuntimeState.cs index 391b622b5..d6dd705e1 100644 --- a/Framework/OrchestrationRuntimeState.cs +++ b/Framework/OrchestrationRuntimeState.cs @@ -110,6 +110,14 @@ public DateTime CompletedTime } } + /// + /// The external storage key + /// + public string StorageKey + { + get; set; + } + /// /// Gets the serialized input of the ExecutionStartedEvent /// diff --git a/Framework/RuntimeStateStreamConverter.cs b/Framework/RuntimeStateStreamConverter.cs new file mode 100644 index 000000000..3f41a1f5c --- /dev/null +++ b/Framework/RuntimeStateStreamConverter.cs @@ -0,0 +1,149 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Threading.Tasks; + using DurableTask.Common; + using DurableTask.History; + using DurableTask.Serializing; + using DurableTask.Tracing; + using DurableTask.Tracking; + + class RuntimeStateStreamConverter + { + public static async Task OrchestrationRuntimeStateToRawStream(OrchestrationRuntimeState newOrchestrationRuntimeState, + OrchestrationRuntimeState runtimeState, DataConverter dataConverter, bool shouldCompress, long sessionStreamTerminationThresholdInBytes, + long sessionStreamExternalStorageThresholdInBytes, IBlobStore blobStore, string sessionId) + { + string serializedState = dataConverter.Serialize(newOrchestrationRuntimeState); + + long originalStreamSize = 0; + Stream compressedState = Utils.WriteStringToStream( + serializedState, + shouldCompress, + out originalStreamSize); + + runtimeState.Size = originalStreamSize; + runtimeState.CompressedSize = compressedState.Length; + + if (runtimeState.CompressedSize > sessionStreamTerminationThresholdInBytes) + { + throw new ArgumentException($"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {sessionStreamTerminationThresholdInBytes} bytes"); + } + + if (runtimeState.CompressedSize > sessionStreamExternalStorageThresholdInBytes) + { + return await CreateStreamForExternalRuntimeStateAsync(shouldCompress, + blobStore, sessionId, dataConverter, compressedState); + } + + return compressedState; + } + + async static Task CreateStreamForExternalRuntimeStateAsync(bool shouldCompress, + IBlobStore blobStore, string sessionId, DataConverter dataConverter, Stream compressedState) + { + if (blobStore == null) + { + throw new ArgumentException($"The compressed session is larger than supported. " + + $"Please provide an implementation of IBlobStore for external storage.", + nameof(IBlobStore)); + } + + // create a new runtime state with the external storage key + IList historyEvents = new List(); + ExecutionStartedEvent historyEvent = new ExecutionStartedEvent(1, ""); + historyEvents.Add(historyEvent); + + OrchestrationRuntimeState runtimeStateExternalStorage = new OrchestrationRuntimeState(historyEvents); + string key = blobStore.BuildSessionStorageKey(sessionId); + runtimeStateExternalStorage.StorageKey = key; + + string serializedStateExternal = dataConverter.Serialize(runtimeStateExternalStorage); + long streamSize; + Stream compressedStateForSession = Utils.WriteStringToStream( + serializedStateExternal, + shouldCompress, + out streamSize); + + await blobStore.SaveStreamWithKeyAsync(key, compressedState); + return compressedStateForSession; + } + + + public static async Task RawStreamToRuntimeState(Stream rawSessionStream, string sessionId, IBlobStore blobStore, DataConverter dataConverter) + { + bool isEmptySession; + OrchestrationRuntimeState runtimeState; + Stream sessionStream = await Utils.GetDecompressedStreamAsync(rawSessionStream); + + isEmptySession = sessionStream == null; + long rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length; + long newSessionStateSize = isEmptySession ? 0 : sessionStream.Length; + + runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter); + + if (string.IsNullOrWhiteSpace(runtimeState.StorageKey)) + { + TraceHelper.TraceSession(TraceEventType.Information, sessionId, + $"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}"); + return runtimeState; + } + + if (blobStore == null) + { + throw new ArgumentException($"Please provide an implementation of IBlobStore for external storage to load the runtime state.", + nameof(IBlobStore)); + } + Stream externalStream = await blobStore.LoadStreamWithKeyAsync(runtimeState.StorageKey); + return await RawStreamToRuntimeState(externalStream, sessionId, blobStore, dataConverter); + } + + static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter) + { + OrchestrationRuntimeState runtimeState; + if (stateStream == null) + { + TraceHelper.TraceSession(TraceEventType.Information, sessionId, + "No session state exists, creating new session state."); + runtimeState = new OrchestrationRuntimeState(); + } + else + { + if (stateStream.Position != 0) + { + throw TraceHelper.TraceExceptionSession(TraceEventType.Error, sessionId, + new ArgumentException("Stream is partially consumed")); + } + + string serializedState = null; + using (var reader = new StreamReader(stateStream)) + { + serializedState = reader.ReadToEnd(); + } + + OrchestrationRuntimeState restoredState = dataConverter.Deserialize(serializedState); + // Create a new Object with just the events and storage key, we don't want the rest + runtimeState = new OrchestrationRuntimeState(restoredState.Events); + runtimeState.StorageKey = restoredState.StorageKey; + } + + return runtimeState; + } + } +} diff --git a/Framework/ServiceBusOrchestrationService.cs b/Framework/ServiceBusOrchestrationService.cs index 9be789231..494be73bf 100644 --- a/Framework/ServiceBusOrchestrationService.cs +++ b/Framework/ServiceBusOrchestrationService.cs @@ -45,7 +45,8 @@ public class ServiceBusOrchestrationService : IOrchestrationService, IOrchestrat // as every fetched message also creates a tracking message which counts towards this limit. const int MaxMessageCount = 80; const int SessionStreamWarningSizeInBytes = 200 * 1024; - const int SessionStreamTerminationThresholdInBytes = 230 * 1024; + const int SessionStreamExternalStorageThresholdInBytes = 230 * 1024; + const int SessionStreamTerminationThresholdInBytes = 20 * 1024 * 1024; const int StatusPollingIntervalInSeconds = 2; const int DuplicateDetectionWindowInHours = 4; @@ -410,7 +411,7 @@ public async Task LockNextTaskOrchestrationWorkItemAs IList newTaskMessages = await Task.WhenAll( newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync(message, blobStore))); - OrchestrationRuntimeState runtimeState = await GetSessionState(session); + OrchestrationRuntimeState runtimeState = await GetSessionState(session, this.InstanceStore as IBlobStore); long maxSequenceNumber = newMessages .OrderByDescending(message => message.SequenceNumber) @@ -1291,27 +1292,10 @@ string GetFormattedLog(string input) return input; } - static async Task GetSessionState(MessageSession session) + static async Task GetSessionState(MessageSession session, IBlobStore blobStore) { - long rawSessionStateSize; - long newSessionStateSize; - OrchestrationRuntimeState runtimeState; - bool isEmptySession; - using (Stream rawSessionStream = await session.GetStateAsync()) - using (Stream sessionStream = await Utils.GetDecompressedStreamAsync(rawSessionStream)) - { - isEmptySession = sessionStream == null; - rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length; - newSessionStateSize = isEmptySession ? 0 : sessionStream.Length; - - runtimeState = GetOrCreateInstanceState(sessionStream, session.SessionId); - } - - TraceHelper.TraceSession(TraceEventType.Information, session.SessionId, - $"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}"); - - return runtimeState; + return await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawSessionStream, session.SessionId, blobStore, DataConverter); } async Task TrySetSessionState( @@ -1328,42 +1312,38 @@ async Task TrySetSessionState( return true; } - string serializedState = DataConverter.Serialize(newOrchestrationRuntimeState); + try + { + Stream rawStream = await + RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeState, + runtimeState, DataConverter, + Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState, + SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, this.InstanceStore as IBlobStore, session.SessionId); + + session.SetState(rawStream); - long originalStreamSize = 0; - using ( - Stream compressedState = Utils.WriteStringToStream( - serializedState, - Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState, - out originalStreamSize)) + } + catch (ArgumentException exception) { - runtimeState.Size = originalStreamSize; - runtimeState.CompressedSize = compressedState.Length; - if (runtimeState.CompressedSize > SessionStreamTerminationThresholdInBytes) - { - // basic idea is to simply enqueue a terminate message just like how we do it from taskhubclient - // it is possible to have other messages in front of the queue and those will get processed before - // the terminate message gets processed. but that is ok since in the worst case scenario we will - // simply land in this if-block again and end up queuing up another terminate message. - // - // the interesting scenario is when the second time we *dont* land in this if-block because e.g. - // the new messages that we processed caused a new generation to be created. in that case - // it is still ok because the worst case scenario is that we will terminate a newly created generation - // which shouldn't have been created at all in the first place + // basic idea is to simply enqueue a terminate message just like how we do it from taskhubclient + // it is possible to have other messages in front of the queue and those will get processed before + // the terminate message gets processed. but that is ok since in the worst case scenario we will + // simply land in this if-block again and end up queuing up another terminate message. + // + // the interesting scenario is when the second time we *dont* land in this if-block because e.g. + // the new messages that we processed caused a new generation to be created. in that case + // it is still ok because the worst case scenario is that we will terminate a newly created generation + // which shouldn't have been created at all in the first place - isSessionSizeThresholdExceeded = true; + isSessionSizeThresholdExceeded = true; - string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {SessionStreamTerminationThresholdInBytes} bytes"; - TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason); + string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {SessionStreamTerminationThresholdInBytes} bytes"; + TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason); - BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason); + BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason); - await orchestratorQueueClient.SendAsync(forcedTerminateMessage); - } - else - { - session.SetState(compressedState); - } + await orchestratorQueueClient.SendAsync(forcedTerminateMessage); } return !isSessionSizeThresholdExceeded; @@ -1454,37 +1434,6 @@ async Task CreateForcedTerminateMessageAsync(string instanceId, return message; } - static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId) - { - OrchestrationRuntimeState runtimeState; - if (stateStream == null) - { - TraceHelper.TraceSession(TraceEventType.Information, sessionId, - "No session state exists, creating new session state."); - runtimeState = new OrchestrationRuntimeState(); - } - else - { - if (stateStream.Position != 0) - { - throw TraceHelper.TraceExceptionSession(TraceEventType.Error, sessionId, - new ArgumentException("Stream is partially consumed")); - } - - string serializedState = null; - using (var reader = new StreamReader(stateStream)) - { - serializedState = reader.ReadToEnd(); - } - - OrchestrationRuntimeState restoredState = DataConverter.Deserialize(serializedState); - // Create a new Object with just the events, we don't want the rest - runtimeState = new OrchestrationRuntimeState(restoredState.Events); - } - - return runtimeState; - } - void ThrowIfInstanceStoreNotConfigured() { if (InstanceStore == null) diff --git a/Framework/Tracking/AzureTableInstanceStore.cs b/Framework/Tracking/AzureTableInstanceStore.cs index acea979dd..6e255ce50 100644 --- a/Framework/Tracking/AzureTableInstanceStore.cs +++ b/Framework/Tracking/AzureTableInstanceStore.cs @@ -11,6 +11,8 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +using Microsoft.ServiceBus.Messaging; + namespace DurableTask.Tracking { using System; @@ -473,6 +475,17 @@ public string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance messageFireTime); } + /// + /// Create a storage key based on message session. + /// This key will be used to save and load the stream in external storage when it is too large. + /// + /// The message session Id. + /// A storage key. + public string BuildSessionStorageKey(string sessionId) + { + return BlobStorageClientHelper.BuildSessionStorageKey(sessionId); + } + /// /// Save the stream of the message or seesion using key. /// diff --git a/Framework/Tracking/BlobStorageClientHelper.cs b/Framework/Tracking/BlobStorageClientHelper.cs index ea38c8152..561adea86 100644 --- a/Framework/Tracking/BlobStorageClientHelper.cs +++ b/Framework/Tracking/BlobStorageClientHelper.cs @@ -11,6 +11,8 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +using Microsoft.ServiceBus.Messaging; + namespace DurableTask.Tracking { using System; @@ -45,6 +47,13 @@ public static string BuildMessageStorageKey(string instanceId, string executionI id); } + public static string BuildSessionStorageKey(string sessionId) + { + string id = Guid.NewGuid().ToString("N"); + return string.Format("session{0}{2}{1}{3}{4}{5}", ContainerDelimiter, KeyDelimiter, + GetDateStringForContainerName(DateTime.MinValue), sessionId, BlobNameDelimiter, id); + } + // use the message fire time if it is set; // otherwise, use the current utc time as the date string as part of the container name static string GetDateStringForContainerName(DateTime messageFireTime) diff --git a/Framework/Tracking/IBlobStore.cs b/Framework/Tracking/IBlobStore.cs index cbb6dd833..3c6c2983b 100644 --- a/Framework/Tracking/IBlobStore.cs +++ b/Framework/Tracking/IBlobStore.cs @@ -11,6 +11,8 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +using Microsoft.ServiceBus.Messaging; + namespace DurableTask.Tracking { using System; @@ -38,6 +40,14 @@ interface IBlobStore /// A message storage key. string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance, DateTime messageFireTime); + /// + /// Create a storage key based on message session. + /// This key will be used to save and load the stream in external storage when it is too large. + /// + /// The message session Id. + /// A storage key. + string BuildSessionStorageKey(string sessionId); + /// /// Save the stream of the message or seesion using key. /// diff --git a/FrameworkUnitTests/BlobStorageClientHelperTest.cs b/FrameworkUnitTests/BlobStorageClientHelperTest.cs index c4c43766a..249938242 100644 --- a/FrameworkUnitTests/BlobStorageClientHelperTest.cs +++ b/FrameworkUnitTests/BlobStorageClientHelperTest.cs @@ -46,6 +46,19 @@ public void IsContainerExpiredTest() } } + [TestMethod] + public void BuildStorageKeyTest() + { + DateTime messageFireTime = new DateTime(2015, 05, 17); + string key = BlobStorageClientHelper.BuildStorageKey(messageFireTime); + Regex regex = new Regex(@"blob-20150517|\w{32}$"); + Assert.IsTrue(regex.Match(key).Success); + + key = BlobStorageClientHelper.BuildStorageKey(DateTime.MinValue); + regex = new Regex(@"blob-\d{8}|\w{32}$"); + Assert.IsTrue(regex.Match(key).Success); + } + [TestMethod] public void BuildMessageStorageKeyTest() { @@ -61,6 +74,15 @@ public void BuildMessageStorageKeyTest() Assert.IsTrue(regex.Match(key).Success); } + [TestMethod] + public void BuildSessionStorageKeyTest() + { + string sessionId = "abc"; + string key = BlobStorageClientHelper.BuildSessionStorageKey(sessionId); + Regex regex = new Regex(@"session-\d{8}|abc/\w{32}$"); + Assert.IsTrue(regex.Match(key).Success); + } + [TestMethod] public void ParseKeyTest() { diff --git a/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj b/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj index 708aef516..dc75704e6 100644 --- a/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj +++ b/FrameworkUnitTests/DurableTaskFrameworkUnitTests.csproj @@ -132,6 +132,7 @@ + diff --git a/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs b/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs new file mode 100644 index 000000000..23f5d731d --- /dev/null +++ b/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs @@ -0,0 +1,155 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace FrameworkUnitTests +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using DurableTask; + using DurableTask.History; + using DurableTask.Serializing; + using DurableTask.Tracking; + + [TestClass] + public class RumtimeStateStreamConverterTest + { + const long SessionStreamTerminationThresholdInBytes = 10 * 1024; + const long SessionStreamExternalStorageThresholdInBytes = 2 * 1024; + const string sessionId = "session123"; + + AzureTableInstanceStore azureTableInstanceStore; + [TestInitialize] + public void TestInitialize() + { + azureTableInstanceStore = TestHelpers.CreateAzureTableInstanceStore(); + } + + [TestCleanup] + public void TestCleanup() + { + azureTableInstanceStore.DeleteStoreAsync().Wait(); + } + + [TestMethod] + public async Task SmallRuntimeStateConverterTest() + { + string smallInput = "abc"; + + OrchestrationRuntimeState newOrchestrationRuntimeStateSmall = generateOrchestrationRuntimeState(smallInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // a small runtime state doesn't need external storage. + Stream rawStreamSmall = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateSmall, + runtimeState, dataConverter, true, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, azureTableInstanceStore as IBlobStore, sessionId); + OrchestrationRuntimeState convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamSmall, "sessionId", azureTableInstanceStore as IBlobStore, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall); + + // test for un-compress case + Stream rawStreamSmall2 = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateSmall, + runtimeState, dataConverter, false, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, azureTableInstanceStore as IBlobStore, sessionId); + OrchestrationRuntimeState convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamSmall2, "sessionId", azureTableInstanceStore as IBlobStore, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall2); + + // test for backward comp: ok for an un-implemented (or null) IBlobStorage for small runtime states + Stream rawStreamSmall3 = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateSmall, + runtimeState, dataConverter, true, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, null, sessionId); + OrchestrationRuntimeState convertedRuntimeStateSmall3 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamSmall3, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall3); + } + + [TestMethod] + public async Task LargeRuntimeStateConverterTest() + { + string largeInput = TestUtils.GenerateRandomString(5 * 1024); + OrchestrationRuntimeState newOrchestrationRuntimeStateLarge = generateOrchestrationRuntimeState(largeInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // a large runtime state that needs external storage. + Stream rawStreamLarge = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateLarge, + runtimeState, dataConverter, true, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, azureTableInstanceStore as IBlobStore, sessionId); + OrchestrationRuntimeState convertedRuntimeStateLarge = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamLarge, "sessionId", azureTableInstanceStore as IBlobStore, dataConverter); + verifyEventInput(largeInput, convertedRuntimeStateLarge); + + // test for un-compress case + string largeInput2 = TestUtils.GenerateRandomString(3 * 1024); + OrchestrationRuntimeState newOrchestrationRuntimeStateLarge2 = generateOrchestrationRuntimeState(largeInput2); + Stream rawStreamLarge2 = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateLarge2, + runtimeState, dataConverter, false, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, azureTableInstanceStore as IBlobStore, sessionId); + OrchestrationRuntimeState convertedRuntimeStateLarge2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamLarge2, "sessionId", azureTableInstanceStore as IBlobStore, dataConverter); + verifyEventInput(largeInput2, convertedRuntimeStateLarge2); + + // test for an un-implemented (or null) IBlobStorage for large runtime states + try + { + await + RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateLarge, + runtimeState, dataConverter, true, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, null, sessionId); + Assert.Fail("ArgumentException must be thrown"); + } + catch (ArgumentException e) + { + // expected + Assert.IsTrue(e.Message.Contains("IBlobStore"), "Exception must contain IBlobStore."); + } + } + + [TestMethod] + public async Task VeryLargeRuntimeStateConverterTest() + { + string veryLargeInput = TestUtils.GenerateRandomString(20 * 1024); + OrchestrationRuntimeState newOrchestrationRuntimeStateLarge = generateOrchestrationRuntimeState(veryLargeInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // test for very large size rumtime state: should throw exception + try + { + Stream rawStreamVeryLarge = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateLarge, + runtimeState, dataConverter, true, SessionStreamTerminationThresholdInBytes, + SessionStreamExternalStorageThresholdInBytes, azureTableInstanceStore as IBlobStore, sessionId); + Assert.Fail("ArgumentException must be thrown"); + } + catch (ArgumentException e) + { + // expected + } + } + + OrchestrationRuntimeState generateOrchestrationRuntimeState(string input) + { + IList historyEvents = new List(); + ExecutionStartedEvent historyEvent = new ExecutionStartedEvent(1, input); + historyEvents.Add(historyEvent); + OrchestrationRuntimeState newOrchestrationRuntimeState = new OrchestrationRuntimeState(historyEvents); + + return newOrchestrationRuntimeState; + } + + void verifyEventInput(string expectedHistoryEventInput, OrchestrationRuntimeState runtimeState) + { + ExecutionStartedEvent executionStartedEvent = runtimeState.Events[0] as ExecutionStartedEvent; + Assert.AreEqual(expectedHistoryEventInput, executionStartedEvent.Input); + } + } +}