diff --git a/Framework/DurableTaskFramework.csproj b/Framework/DurableTaskFramework.csproj index 33aae8d9d..d83a649d6 100644 --- a/Framework/DurableTaskFramework.csproj +++ b/Framework/DurableTaskFramework.csproj @@ -107,6 +107,7 @@ + diff --git a/Framework/OrchestrationRuntimeState.cs b/Framework/OrchestrationRuntimeState.cs index d6dd705e1..391b622b5 100644 --- a/Framework/OrchestrationRuntimeState.cs +++ b/Framework/OrchestrationRuntimeState.cs @@ -110,14 +110,6 @@ public DateTime CompletedTime } } - /// - /// The external storage key - /// - public string StorageKey - { - get; set; - } - /// /// Gets the serialized input of the ExecutionStartedEvent /// diff --git a/Framework/OrchestrationSessionState.cs b/Framework/OrchestrationSessionState.cs new file mode 100644 index 000000000..ece616a19 --- /dev/null +++ b/Framework/OrchestrationSessionState.cs @@ -0,0 +1,57 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask +{ + using System.Collections.Generic; + using DurableTask.History; + + /// + /// The object that represents the serialized session state. + /// It hold a list of history events (empty storage key), or a key for external storage if it is too large to fit in the the session state. + /// + internal class OrchestrationSessionState + { + /// + /// A constructor for deserialzation. + /// + public OrchestrationSessionState() + { + } + + public OrchestrationSessionState(IList events) + { + this.Events = events; + } + + public OrchestrationSessionState(string storageKey) + { + this.StorageKey = storageKey; + + // generate history events only for serialization/deserialization; + // the actual history events are stored exterally with the storage key. + this.Events = new List(); + this.Events.Add(new ExecutionStartedEvent(-1, string.Empty)); + } + + /// + /// List of all history events for runtime state + /// + public IList Events { get; set; } + + /// + /// The storage key for external storage. Could be null or empty if not externally stored. + /// + public string StorageKey { get; set; } + } +} diff --git a/Framework/RuntimeStateStreamConverter.cs b/Framework/RuntimeStateStreamConverter.cs index 3f41a1f5c..7f85e452f 100644 --- a/Framework/RuntimeStateStreamConverter.cs +++ b/Framework/RuntimeStateStreamConverter.cs @@ -24,13 +24,19 @@ namespace DurableTask using DurableTask.Tracing; using DurableTask.Tracking; + /// + /// A converter that does conversion between the OrchestrationRuntimeState instance and a stream. + /// The stream is a serialized OrchestrationSessionState that will set as session state. + /// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList. + /// class RuntimeStateStreamConverter { public static async Task OrchestrationRuntimeStateToRawStream(OrchestrationRuntimeState newOrchestrationRuntimeState, OrchestrationRuntimeState runtimeState, DataConverter dataConverter, bool shouldCompress, long sessionStreamTerminationThresholdInBytes, long sessionStreamExternalStorageThresholdInBytes, IBlobStore blobStore, string sessionId) { - string serializedState = dataConverter.Serialize(newOrchestrationRuntimeState); + OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(newOrchestrationRuntimeState.Events); + string serializedState = dataConverter.Serialize(orchestrationSessionState); long originalStreamSize = 0; Stream compressedState = Utils.WriteStringToStream( @@ -48,14 +54,14 @@ public static async Task OrchestrationRuntimeStateToRawStream(Orchestrat if (runtimeState.CompressedSize > sessionStreamExternalStorageThresholdInBytes) { - return await CreateStreamForExternalRuntimeStateAsync(shouldCompress, + return await CreateStreamForExternalStorageAsync(shouldCompress, blobStore, sessionId, dataConverter, compressedState); } return compressedState; } - async static Task CreateStreamForExternalRuntimeStateAsync(bool shouldCompress, + async static Task CreateStreamForExternalStorageAsync(bool shouldCompress, IBlobStore blobStore, string sessionId, DataConverter dataConverter, Stream compressedState) { if (blobStore == null) @@ -65,16 +71,11 @@ async static Task CreateStreamForExternalRuntimeStateAsync(bool shouldCo nameof(IBlobStore)); } - // create a new runtime state with the external storage key - IList historyEvents = new List(); - ExecutionStartedEvent historyEvent = new ExecutionStartedEvent(1, ""); - historyEvents.Add(historyEvent); - - OrchestrationRuntimeState runtimeStateExternalStorage = new OrchestrationRuntimeState(historyEvents); + // create a new orchestration session state with the external storage key string key = blobStore.BuildSessionStorageKey(sessionId); - runtimeStateExternalStorage.StorageKey = key; + OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(key); - string serializedStateExternal = dataConverter.Serialize(runtimeStateExternalStorage); + string serializedStateExternal = dataConverter.Serialize(orchestrationSessionState); long streamSize; Stream compressedStateForSession = Utils.WriteStringToStream( serializedStateExternal, @@ -96,9 +97,10 @@ public static async Task RawStreamToRuntimeState(Stre long rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length; long newSessionStateSize = isEmptySession ? 0 : sessionStream.Length; - runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter); + string storageKey; + runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter, out storageKey); - if (string.IsNullOrWhiteSpace(runtimeState.StorageKey)) + if (string.IsNullOrWhiteSpace(storageKey)) { TraceHelper.TraceSession(TraceEventType.Information, sessionId, $"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}"); @@ -110,13 +112,14 @@ public static async Task RawStreamToRuntimeState(Stre throw new ArgumentException($"Please provide an implementation of IBlobStore for external storage to load the runtime state.", nameof(IBlobStore)); } - Stream externalStream = await blobStore.LoadStreamWithKeyAsync(runtimeState.StorageKey); + Stream externalStream = await blobStore.LoadStreamWithKeyAsync(storageKey); return await RawStreamToRuntimeState(externalStream, sessionId, blobStore, dataConverter); } - static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter) + static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter, out string storageKey) { OrchestrationRuntimeState runtimeState; + storageKey = string.Empty; if (stateStream == null) { TraceHelper.TraceSession(TraceEventType.Information, sessionId, @@ -137,13 +140,43 @@ static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, st serializedState = reader.ReadToEnd(); } - OrchestrationRuntimeState restoredState = dataConverter.Deserialize(serializedState); - // Create a new Object with just the events and storage key, we don't want the rest - runtimeState = new OrchestrationRuntimeState(restoredState.Events); - runtimeState.StorageKey = restoredState.StorageKey; + runtimeState = DeserializeToRuntimeStateWithFallback(serializedState, dataConverter, sessionId, out storageKey); } return runtimeState; } + + static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string serializedState, DataConverter dataConverter, string sessionId, out string storageKey) + { + OrchestrationRuntimeState runtimeState = null; + storageKey = string.Empty; + try + { + OrchestrationSessionState sessionState = + dataConverter.Deserialize(serializedState); + runtimeState = new OrchestrationRuntimeState(sessionState.Events); + storageKey = sessionState.StorageKey; + } + catch (Exception exception) + { + TraceHelper.TraceSession(TraceEventType.Warning, sessionId, + $"Failed to deseriazlize session state to OrchestrationSessionState object: {serializedState}"); + try + { + OrchestrationRuntimeState restoredState = + dataConverter.Deserialize(serializedState); + // Create a new Object with just the events, we don't want the rest + runtimeState = new OrchestrationRuntimeState(restoredState.Events); + } + catch (Exception e) + { + TraceHelper.TraceSession(TraceEventType.Warning, sessionId, + $"Failed to deseriazlize session state to OrchestrationRuntimeState object: {serializedState}"); + IList events = dataConverter.Deserialize>(serializedState); + runtimeState = new OrchestrationRuntimeState(events); + } + } + return runtimeState; + } } } diff --git a/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs b/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs index 23f5d731d..c7690e18f 100644 --- a/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs +++ b/FrameworkUnitTests/RumtimeStateStreamConverterTest.cs @@ -11,6 +11,8 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +using DurableTask.Common; + namespace FrameworkUnitTests { using System; @@ -98,7 +100,7 @@ public async Task LargeRuntimeStateConverterTest() OrchestrationRuntimeState convertedRuntimeStateLarge2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamLarge2, "sessionId", azureTableInstanceStore as IBlobStore, dataConverter); verifyEventInput(largeInput2, convertedRuntimeStateLarge2); - // test for an un-implemented (or null) IBlobStorage for large runtime states + // test for an un-implemented (or null) IBlobStorage for large runtime states: should throw exception try { await @@ -122,7 +124,7 @@ public async Task VeryLargeRuntimeStateConverterTest() OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); DataConverter dataConverter = new JsonDataConverter(); - // test for very large size rumtime state: should throw exception + // test for very large size rumtime state that cannot be saved externally: should throw exception try { Stream rawStreamVeryLarge = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateLarge, @@ -136,6 +138,53 @@ public async Task VeryLargeRuntimeStateConverterTest() } } + [TestMethod] + public async Task ConverterCompatabilityTest() + { + string smallInput = "abc"; + OrchestrationRuntimeState newOrchestrationRuntimeStateSmall = generateOrchestrationRuntimeState(smallInput); + OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState(); + DataConverter dataConverter = new JsonDataConverter(); + + // deserialize a OrchestrationRuntimeState object, with both compression and not compression + Stream stream = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall, true); + OrchestrationRuntimeState convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall); + + stream = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall, false); + convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall); + + // deserialize a IList object, with both compression and not compression + Stream stream2 = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall.Events, true); + OrchestrationRuntimeState convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream2, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall2); + + stream2 = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall.Events, false); + convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream2, "sessionId", null, dataConverter); + verifyEventInput(smallInput, convertedRuntimeStateSmall2); + } + + private Stream serializeToStream(DataConverter dataConverter, OrchestrationRuntimeState orchestrationRuntimeState, bool shouldCompress) + { + string serializedState = dataConverter.Serialize(orchestrationRuntimeState); + long originalStreamSize = 0; + return Utils.WriteStringToStream( + serializedState, + shouldCompress, + out originalStreamSize); + } + + private Stream serializeToStream(DataConverter dataConverter, IList events, bool shouldCompress) + { + string serializedState = dataConverter.Serialize(events); + long originalStreamSize = 0; + return Utils.WriteStringToStream( + serializedState, + shouldCompress, + out originalStreamSize); + } + OrchestrationRuntimeState generateOrchestrationRuntimeState(string input) { IList historyEvents = new List();