Skip to content

Commit

Permalink
Handle seesion overflow using external storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
zheg committed Jul 7, 2016
1 parent 6b0b868 commit fb1029c
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 82 deletions.
1 change: 1 addition & 0 deletions Framework/DurableTaskFramework.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
<Compile Include="Blob\DurableTaskBlobHelper.cs" />
<Compile Include="History\HistoryStateEvent.cs" />
<Compile Include="IOrchestrationServiceInstanceStore.cs" />
<Compile Include="RuntimeStateStreamConverter.cs" />
<Compile Include="Settings\JumpStartSettings.cs" />
<Compile Include="Settings\ServiceBusOrchestrationServiceSettings.cs" />
<Compile Include="Tracking\AzureTableOrchestrationJumpStartEntity.cs" />
Expand Down
8 changes: 8 additions & 0 deletions Framework/OrchestrationRuntimeState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public DateTime CompletedTime
}
}

/// <summary>
/// The external storage key
/// </summary>
public string StorageKey
{
get; set;
}

/// <summary>
/// Gets the serialized input of the ExecutionStartedEvent
/// </summary>
Expand Down
149 changes: 149 additions & 0 deletions Framework/RuntimeStateStreamConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using DurableTask.Common;
using DurableTask.History;
using DurableTask.Serializing;
using DurableTask.Tracing;
using DurableTask.Tracking;

class RuntimeStateStreamConverter
{
public static async Task<Stream> OrchestrationRuntimeStateToRawStream(OrchestrationRuntimeState newOrchestrationRuntimeState,
OrchestrationRuntimeState runtimeState, DataConverter dataConverter, bool shouldCompress, long sessionStreamTerminationThresholdInBytes,
long sessionStreamExternalStorageThresholdInBytes, IBlobStore blobStore, string sessionId)
{
string serializedState = dataConverter.Serialize(newOrchestrationRuntimeState);

long originalStreamSize = 0;
Stream compressedState = Utils.WriteStringToStream(
serializedState,
shouldCompress,
out originalStreamSize);

runtimeState.Size = originalStreamSize;
runtimeState.CompressedSize = compressedState.Length;

if (runtimeState.CompressedSize > sessionStreamTerminationThresholdInBytes)
{
throw new ArgumentException($"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {sessionStreamTerminationThresholdInBytes} bytes");
}

if (runtimeState.CompressedSize > sessionStreamExternalStorageThresholdInBytes)
{
return await CreateStreamForExternalRuntimeStateAsync(shouldCompress,
blobStore, sessionId, dataConverter, compressedState);
}

return compressedState;
}

async static Task<Stream> CreateStreamForExternalRuntimeStateAsync(bool shouldCompress,
IBlobStore blobStore, string sessionId, DataConverter dataConverter, Stream compressedState)
{
if (blobStore == null)
{
throw new ArgumentException($"The compressed session is larger than supported. " +
$"Please provide an implementation of IBlobStore for external storage.",
nameof(IBlobStore));
}

// create a new runtime state with the external storage key
IList<HistoryEvent> historyEvents = new List<HistoryEvent>();
ExecutionStartedEvent historyEvent = new ExecutionStartedEvent(1, "");
historyEvents.Add(historyEvent);

OrchestrationRuntimeState runtimeStateExternalStorage = new OrchestrationRuntimeState(historyEvents);
string key = blobStore.BuildSessionStorageKey(sessionId);
runtimeStateExternalStorage.StorageKey = key;

string serializedStateExternal = dataConverter.Serialize(runtimeStateExternalStorage);
long streamSize;
Stream compressedStateForSession = Utils.WriteStringToStream(
serializedStateExternal,
shouldCompress,
out streamSize);

await blobStore.SaveStreamWithKeyAsync(key, compressedState);
return compressedStateForSession;
}


public static async Task<OrchestrationRuntimeState> RawStreamToRuntimeState(Stream rawSessionStream, string sessionId, IBlobStore blobStore, DataConverter dataConverter)
{
bool isEmptySession;
OrchestrationRuntimeState runtimeState;
Stream sessionStream = await Utils.GetDecompressedStreamAsync(rawSessionStream);

isEmptySession = sessionStream == null;
long rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length;
long newSessionStateSize = isEmptySession ? 0 : sessionStream.Length;

runtimeState = GetOrCreateInstanceState(sessionStream, sessionId, dataConverter);

if (string.IsNullOrWhiteSpace(runtimeState.StorageKey))
{
TraceHelper.TraceSession(TraceEventType.Information, sessionId,
$"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}");
return runtimeState;
}

if (blobStore == null)
{
throw new ArgumentException($"Please provide an implementation of IBlobStore for external storage to load the runtime state.",
nameof(IBlobStore));
}
Stream externalStream = await blobStore.LoadStreamWithKeyAsync(runtimeState.StorageKey);
return await RawStreamToRuntimeState(externalStream, sessionId, blobStore, dataConverter);
}

static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId, DataConverter dataConverter)
{
OrchestrationRuntimeState runtimeState;
if (stateStream == null)
{
TraceHelper.TraceSession(TraceEventType.Information, sessionId,
"No session state exists, creating new session state.");
runtimeState = new OrchestrationRuntimeState();
}
else
{
if (stateStream.Position != 0)
{
throw TraceHelper.TraceExceptionSession(TraceEventType.Error, sessionId,
new ArgumentException("Stream is partially consumed"));
}

string serializedState = null;
using (var reader = new StreamReader(stateStream))
{
serializedState = reader.ReadToEnd();
}

OrchestrationRuntimeState restoredState = dataConverter.Deserialize<OrchestrationRuntimeState>(serializedState);
// Create a new Object with just the events and storage key, we don't want the rest
runtimeState = new OrchestrationRuntimeState(restoredState.Events);
runtimeState.StorageKey = restoredState.StorageKey;
}

return runtimeState;
}
}
}
113 changes: 31 additions & 82 deletions Framework/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public class ServiceBusOrchestrationService : IOrchestrationService, IOrchestrat
// as every fetched message also creates a tracking message which counts towards this limit.
const int MaxMessageCount = 80;
const int SessionStreamWarningSizeInBytes = 200 * 1024;
const int SessionStreamTerminationThresholdInBytes = 230 * 1024;
const int SessionStreamExternalStorageThresholdInBytes = 230 * 1024;
const int SessionStreamTerminationThresholdInBytes = 20 * 1024 * 1024;
const int StatusPollingIntervalInSeconds = 2;
const int DuplicateDetectionWindowInHours = 4;

Expand Down Expand Up @@ -410,7 +411,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
IList<TaskMessage> newTaskMessages = await Task.WhenAll(
newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message, blobStore)));

OrchestrationRuntimeState runtimeState = await GetSessionState(session);
OrchestrationRuntimeState runtimeState = await GetSessionState(session, this.InstanceStore as IBlobStore);

long maxSequenceNumber = newMessages
.OrderByDescending(message => message.SequenceNumber)
Expand Down Expand Up @@ -1291,27 +1292,10 @@ string GetFormattedLog(string input)
return input;
}

static async Task<OrchestrationRuntimeState> GetSessionState(MessageSession session)
static async Task<OrchestrationRuntimeState> GetSessionState(MessageSession session, IBlobStore blobStore)
{
long rawSessionStateSize;
long newSessionStateSize;
OrchestrationRuntimeState runtimeState;
bool isEmptySession;

using (Stream rawSessionStream = await session.GetStateAsync())
using (Stream sessionStream = await Utils.GetDecompressedStreamAsync(rawSessionStream))
{
isEmptySession = sessionStream == null;
rawSessionStateSize = isEmptySession ? 0 : rawSessionStream.Length;
newSessionStateSize = isEmptySession ? 0 : sessionStream.Length;

runtimeState = GetOrCreateInstanceState(sessionStream, session.SessionId);
}

TraceHelper.TraceSession(TraceEventType.Information, session.SessionId,
$"Size of session state is {newSessionStateSize}, compressed {rawSessionStateSize}");

return runtimeState;
return await RuntimeStateStreamConverter.RawStreamToRuntimeState(rawSessionStream, session.SessionId, blobStore, DataConverter);
}

async Task<bool> TrySetSessionState(
Expand All @@ -1328,42 +1312,38 @@ async Task<bool> TrySetSessionState(
return true;
}

string serializedState = DataConverter.Serialize(newOrchestrationRuntimeState);
try
{
Stream rawStream = await
RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(newOrchestrationRuntimeState,
runtimeState, DataConverter,
Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState,
SessionStreamTerminationThresholdInBytes,
SessionStreamExternalStorageThresholdInBytes, this.InstanceStore as IBlobStore, session.SessionId);

session.SetState(rawStream);

long originalStreamSize = 0;
using (
Stream compressedState = Utils.WriteStringToStream(
serializedState,
Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState,
out originalStreamSize))
}
catch (ArgumentException exception)
{
runtimeState.Size = originalStreamSize;
runtimeState.CompressedSize = compressedState.Length;
if (runtimeState.CompressedSize > SessionStreamTerminationThresholdInBytes)
{
// basic idea is to simply enqueue a terminate message just like how we do it from taskhubclient
// it is possible to have other messages in front of the queue and those will get processed before
// the terminate message gets processed. but that is ok since in the worst case scenario we will
// simply land in this if-block again and end up queuing up another terminate message.
//
// the interesting scenario is when the second time we *dont* land in this if-block because e.g.
// the new messages that we processed caused a new generation to be created. in that case
// it is still ok because the worst case scenario is that we will terminate a newly created generation
// which shouldn't have been created at all in the first place
// basic idea is to simply enqueue a terminate message just like how we do it from taskhubclient
// it is possible to have other messages in front of the queue and those will get processed before
// the terminate message gets processed. but that is ok since in the worst case scenario we will
// simply land in this if-block again and end up queuing up another terminate message.
//
// the interesting scenario is when the second time we *dont* land in this if-block because e.g.
// the new messages that we processed caused a new generation to be created. in that case
// it is still ok because the worst case scenario is that we will terminate a newly created generation
// which shouldn't have been created at all in the first place

isSessionSizeThresholdExceeded = true;
isSessionSizeThresholdExceeded = true;

string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {SessionStreamTerminationThresholdInBytes} bytes";
TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason);
string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {SessionStreamTerminationThresholdInBytes} bytes";
TraceHelper.TraceSession(TraceEventType.Critical, workItem.InstanceId, reason);

BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason);
BrokeredMessage forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason);

await orchestratorQueueClient.SendAsync(forcedTerminateMessage);
}
else
{
session.SetState(compressedState);
}
await orchestratorQueueClient.SendAsync(forcedTerminateMessage);
}

return !isSessionSizeThresholdExceeded;
Expand Down Expand Up @@ -1454,37 +1434,6 @@ async Task<BrokeredMessage> CreateForcedTerminateMessageAsync(string instanceId,
return message;
}

static OrchestrationRuntimeState GetOrCreateInstanceState(Stream stateStream, string sessionId)
{
OrchestrationRuntimeState runtimeState;
if (stateStream == null)
{
TraceHelper.TraceSession(TraceEventType.Information, sessionId,
"No session state exists, creating new session state.");
runtimeState = new OrchestrationRuntimeState();
}
else
{
if (stateStream.Position != 0)
{
throw TraceHelper.TraceExceptionSession(TraceEventType.Error, sessionId,
new ArgumentException("Stream is partially consumed"));
}

string serializedState = null;
using (var reader = new StreamReader(stateStream))
{
serializedState = reader.ReadToEnd();
}

OrchestrationRuntimeState restoredState = DataConverter.Deserialize<OrchestrationRuntimeState>(serializedState);
// Create a new Object with just the events, we don't want the rest
runtimeState = new OrchestrationRuntimeState(restoredState.Events);
}

return runtimeState;
}

void ThrowIfInstanceStoreNotConfigured()
{
if (InstanceStore == null)
Expand Down
13 changes: 13 additions & 0 deletions Framework/Tracking/AzureTableInstanceStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ----------------------------------------------------------------------------------

using Microsoft.ServiceBus.Messaging;

namespace DurableTask.Tracking
{
using System;
Expand Down Expand Up @@ -473,6 +475,17 @@ public string BuildMessageStorageKey(OrchestrationInstance orchestrationInstance
messageFireTime);
}

/// <summary>
/// Create a storage key based on message session.
/// This key will be used to save and load the stream in external storage when it is too large.
/// </summary>
/// <param name="sessionId">The message session Id.</param>
/// <returns>A storage key.</returns>
public string BuildSessionStorageKey(string sessionId)
{
return BlobStorageClientHelper.BuildSessionStorageKey(sessionId);
}

/// <summary>
/// Save the stream of the message or seesion using key.
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions Framework/Tracking/BlobStorageClientHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ----------------------------------------------------------------------------------

using Microsoft.ServiceBus.Messaging;

namespace DurableTask.Tracking
{
using System;
Expand Down Expand Up @@ -45,6 +47,13 @@ public static string BuildMessageStorageKey(string instanceId, string executionI
id);
}

public static string BuildSessionStorageKey(string sessionId)
{
string id = Guid.NewGuid().ToString("N");
return string.Format("session{0}{2}{1}{3}{4}{5}", ContainerDelimiter, KeyDelimiter,
GetDateStringForContainerName(DateTime.MinValue), sessionId, BlobNameDelimiter, id);
}

// use the message fire time if it is set;
// otherwise, use the current utc time as the date string as part of the container name
static string GetDateStringForContainerName(DateTime messageFireTime)
Expand Down
Loading

0 comments on commit fb1029c

Please sign in to comment.