Skip to content

Commit

Permalink
Deserialization with fallback.
Browse files Browse the repository at this point in the history
  • Loading branch information
zheg committed Jul 13, 2016
1 parent 79144c0 commit b8338dc
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 29 deletions.
1 change: 1 addition & 0 deletions Framework/DurableTaskFramework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
<Compile Include="Blob\DurableTaskBlobHelper.cs" />
<Compile Include="History\HistoryStateEvent.cs" />
<Compile Include="IOrchestrationServiceInstanceStore.cs" />
<Compile Include="OrchestrationSessionState.cs" />
<Compile Include="RuntimeStateStreamConverter.cs" />
<Compile Include="Settings\JumpStartSettings.cs" />
<Compile Include="Settings\ServiceBusOrchestrationServiceSettings.cs" />
Expand Down
8 changes: 0 additions & 8 deletions Framework/OrchestrationRuntimeState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ public DateTime CompletedTime
}
}

/// <summary>
/// The external storage key
/// </summary>
public string StorageKey
{
get; set;
}

/// <summary>
/// Gets the serialized input of the ExecutionStartedEvent
/// </summary>
Expand Down
57 changes: 57 additions & 0 deletions Framework/OrchestrationSessionState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask
{
using System.Collections.Generic;
using DurableTask.History;

/// <summary>
/// The object that represents the serialized session state.
/// It hold a list of history events (empty storage key), or a key for external storage if it is too large to fit in the the session state.
/// </summary>
internal class OrchestrationSessionState
{
/// <summary>
/// A constructor for deserialzation.
/// </summary>
public OrchestrationSessionState()
{
}

public OrchestrationSessionState(IList<HistoryEvent> events)
{
this.Events = events;
}

public OrchestrationSessionState(string storageKey)
{
this.StorageKey = storageKey;

// generate history events only for serialization/deserialization;
// the actual history events are stored exterally with the storage key.
this.Events = new List<HistoryEvent>();
this.Events.Add(new ExecutionStartedEvent(-1, string.Empty));
}

/// <summary>
/// List of all history events for runtime state
/// </summary>
public IList<HistoryEvent> Events { get; set; }

/// <summary>
/// The storage key for external storage. Could be null or empty if not externally stored.
/// </summary>
public string StorageKey { get; set; }
}
}
71 changes: 52 additions & 19 deletions Framework/RuntimeStateStreamConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ namespace DurableTask
using DurableTask.Tracing;
using DurableTask.Tracking;

/// <summary>
/// A converter that does conversion between the OrchestrationRuntimeState instance and a stream.
/// The stream is a serialized OrchestrationSessionState that will set as session state.
/// De-serialization is done with fallbacks in the order: OrchestrationSessionState -> OrchestrationRuntimeState -> IList<HistoryEvent>.
/// </summary>
class RuntimeStateStreamConverter
{
public static async Task<Stream> OrchestrationRuntimeStateToRawStream(OrchestrationRuntimeState newOrchestrationRuntimeState,
OrchestrationRuntimeState runtimeState, DataConverter dataConverter, bool shouldCompress, long sessionStreamTerminationThresholdInBytes,
long sessionStreamExternalStorageThresholdInBytes, IBlobStore blobStore, string sessionId)
{
string serializedState = dataConverter.Serialize(newOrchestrationRuntimeState);
OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(newOrchestrationRuntimeState.Events);
string serializedState = dataConverter.Serialize(orchestrationSessionState);

long originalStreamSize = 0;
Stream compressedState = Utils.WriteStringToStream(
Expand All @@ -48,14 +54,14 @@ public static async Task<Stream> OrchestrationRuntimeStateToRawStream(Orchestrat

if (runtimeState.CompressedSize > sessionStreamExternalStorageThresholdInBytes)
{
return await CreateStreamForExternalRuntimeStateAsync(shouldCompress,
return await CreateStreamForExternalStorageAsync(shouldCompress,
blobStore, sessionId, dataConverter, compressedState);
}

return compressedState;
}

async static Task<Stream> CreateStreamForExternalRuntimeStateAsync(bool shouldCompress,
async static Task<Stream> CreateStreamForExternalStorageAsync(bool shouldCompress,
IBlobStore blobStore, string sessionId, DataConverter dataConverter, Stream compressedState)
{
if (blobStore == null)
Expand All @@ -65,16 +71,11 @@ async static Task<Stream> CreateStreamForExternalRuntimeStateAsync(bool shouldCo
nameof(IBlobStore));
}

// create a new runtime state with the external storage key
IList<HistoryEvent> historyEvents = new List<HistoryEvent>();
ExecutionStartedEvent historyEvent = new ExecutionStartedEvent(1, "");
historyEvents.Add(historyEvent);

OrchestrationRuntimeState runtimeStateExternalStorage = new OrchestrationRuntimeState(historyEvents);
// create a new orchestration session state with the external storage key
string key = blobStore.BuildSessionStorageKey(sessionId);
runtimeStateExternalStorage.StorageKey = key;
OrchestrationSessionState orchestrationSessionState = new OrchestrationSessionState(key);

string serializedStateExternal = dataConverter.Serialize(runtimeStateExternalStorage);
string serializedStateExternal = dataConverter.Serialize(orchestrationSessionState);
long streamSize;
Stream compressedStateForSession = Utils.WriteStringToStream(
serializedStateExternal,
Expand All @@ -96,9 +97,10 @@ public static async Task<OrchestrationRuntimeState> RawStreamToRuntimeState(Stre
long rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length;
long newSessionStateSize = isEmptySession ? 0 : sessionStream.Length;

runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter);
string storageKey;
runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter, out storageKey);

if (string.IsNullOrWhiteSpace(runtimeState.StorageKey))
if (string.IsNullOrWhiteSpace(storageKey))
{
TraceHelper.TraceSession(TraceEventType.Information, sessionId,
$"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}");
Expand All @@ -110,13 +112,14 @@ public static async Task<OrchestrationRuntimeState> RawStreamToRuntimeState(Stre
throw new ArgumentException($"Please provide an implementation of IBlobStore for external storage to load the runtime state.",
nameof(IBlobStore));
}
Stream externalStream = await blobStore.LoadStreamWithKeyAsync(runtimeState.StorageKey);
Stream externalStream = await blobStore.LoadStreamWithKeyAsync(storageKey);
return await RawStreamToRuntimeState(externalStream, sessionId, blobStore, dataConverter);
}

static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter)
static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter, out string storageKey)
{
OrchestrationRuntimeState runtimeState;
storageKey = string.Empty;
if (stateStream == null)
{
TraceHelper.TraceSession(TraceEventType.Information, sessionId,
Expand All @@ -137,13 +140,43 @@ static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, st
serializedState = reader.ReadToEnd();
}

OrchestrationRuntimeState restoredState = dataConverter.Deserialize<OrchestrationRuntimeState>(serializedState);
// Create a new Object with just the events and storage key, we don't want the rest
runtimeState = new OrchestrationRuntimeState(restoredState.Events);
runtimeState.StorageKey = restoredState.StorageKey;
runtimeState = DeserializeToRuntimeStateWithFallback(serializedState, dataConverter, sessionId, out storageKey);
}

return runtimeState;
}

static OrchestrationRuntimeState DeserializeToRuntimeStateWithFallback(string serializedState, DataConverter dataConverter, string sessionId, out string storageKey)
{
OrchestrationRuntimeState runtimeState = null;
storageKey = string.Empty;
try
{
OrchestrationSessionState sessionState =
dataConverter.Deserialize<OrchestrationSessionState>(serializedState);
runtimeState = new OrchestrationRuntimeState(sessionState.Events);
storageKey = sessionState.StorageKey;
}
catch (Exception exception)
{
TraceHelper.TraceSession(TraceEventType.Warning, sessionId,
$"Failed to deseriazlize session state to OrchestrationSessionState object: {serializedState}");
try
{
OrchestrationRuntimeState restoredState =
dataConverter.Deserialize<OrchestrationRuntimeState>(serializedState);
// Create a new Object with just the events, we don't want the rest
runtimeState = new OrchestrationRuntimeState(restoredState.Events);
}
catch (Exception e)
{
TraceHelper.TraceSession(TraceEventType.Warning, sessionId,
$"Failed to deseriazlize session state to OrchestrationRuntimeState object: {serializedState}");
IList<HistoryEvent> events = dataConverter.Deserialize<IList<HistoryEvent>>(serializedState);
runtimeState = new OrchestrationRuntimeState(events);
}
}
return runtimeState;
}
}
}
53 changes: 51 additions & 2 deletions FrameworkUnitTests/RumtimeStateStreamConverterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ----------------------------------------------------------------------------------

using DurableTask.Common;

namespace FrameworkUnitTests
{
using System;
Expand Down Expand Up @@ -98,7 +100,7 @@ public async Task LargeRuntimeStateConverterTest()
OrchestrationRuntimeState convertedRuntimeStateLarge2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawStreamLarge2, "sessionId", azureTableInstanceStore as IBlobStore, dataConverter);
verifyEventInput(largeInput2, convertedRuntimeStateLarge2);

// test for an un-implemented (or null) IBlobStorage for large runtime states
// test for an un-implemented (or null) IBlobStorage for large runtime states: should throw exception
try
{
await
Expand All @@ -122,7 +124,7 @@ public async Task VeryLargeRuntimeStateConverterTest()
OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState();
DataConverter dataConverter = new JsonDataConverter();

// test for very large size rumtime state: should throw exception
// test for very large size rumtime state that cannot be saved externally: should throw exception
try
{
Stream rawStreamVeryLarge = await RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeStateLarge,
Expand All @@ -136,6 +138,53 @@ public async Task VeryLargeRuntimeStateConverterTest()
}
}

[TestMethod]
public async Task ConverterCompatabilityTest()
{
string smallInput = "abc";
OrchestrationRuntimeState newOrchestrationRuntimeStateSmall = generateOrchestrationRuntimeState(smallInput);
OrchestrationRuntimeState runtimeState = new OrchestrationRuntimeState();
DataConverter dataConverter = new JsonDataConverter();

// deserialize a OrchestrationRuntimeState object, with both compression and not compression
Stream stream = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall, true);
OrchestrationRuntimeState convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream, "sessionId", null, dataConverter);
verifyEventInput(smallInput, convertedRuntimeStateSmall);

stream = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall, false);
convertedRuntimeStateSmall = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream, "sessionId", null, dataConverter);
verifyEventInput(smallInput, convertedRuntimeStateSmall);

// deserialize a IList<HistoryEvent> object, with both compression and not compression
Stream stream2 = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall.Events, true);
OrchestrationRuntimeState convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream2, "sessionId", null, dataConverter);
verifyEventInput(smallInput, convertedRuntimeStateSmall2);

stream2 = serializeToStream(dataConverter, newOrchestrationRuntimeStateSmall.Events, false);
convertedRuntimeStateSmall2 = await RuntimeStateStreamConverter.RawStreamToRuntimeState(stream2, "sessionId", null, dataConverter);
verifyEventInput(smallInput, convertedRuntimeStateSmall2);
}

private Stream serializeToStream(DataConverter dataConverter, OrchestrationRuntimeState orchestrationRuntimeState, bool shouldCompress)
{
string serializedState = dataConverter.Serialize(orchestrationRuntimeState);
long originalStreamSize = 0;
return Utils.WriteStringToStream(
serializedState,
shouldCompress,
out originalStreamSize);
}

private Stream serializeToStream(DataConverter dataConverter, IList<HistoryEvent> events, bool shouldCompress)
{
string serializedState = dataConverter.Serialize(events);
long originalStreamSize = 0;
return Utils.WriteStringToStream(
serializedState,
shouldCompress,
out originalStreamSize);
}

OrchestrationRuntimeState generateOrchestrationRuntimeState(string input)
{
IList<HistoryEvent> historyEvents = new List<HistoryEvent>();
Expand Down

0 comments on commit b8338dc

Please sign in to comment.