diff --git a/UpgradeLog.htm b/UpgradeLog.htm
new file mode 100644
index 000000000..9ead0de06
--- /dev/null
+++ b/UpgradeLog.htm
@@ -0,0 +1,268 @@
+
+
+
+ Migration Report
+
+ Migration Report -
Overview
| Project | Path | Errors | Warnings | Messages |
---|
| TestApplication | test\TestFabricApplication\TestApplication\TestApplication.sfproj | 1 | 0 | 0 |
Solution and projects
TestApplication
| test\TestFabricApplication\TestApplication\TestApplication.sfproj:
+ The application which this project type is based on was not found. Please try this link for further information: a07b5eb6-e848-4116-a8d0-a826331d98c6 |
\ No newline at end of file
diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs
index 1c22c307f..49c1ffb99 100644
--- a/src/DurableTask.Core/TaskActivityDispatcher.cs
+++ b/src/DurableTask.Core/TaskActivityDispatcher.cs
@@ -54,7 +54,8 @@ internal TaskActivityDispatcher(
"TaskActivityDispatcher",
item => item.Id,
this.OnFetchWorkItemAsync,
- this.OnProcessWorkItemAsync)
+ this.OnProcessWorkItemAsync,
+ this.RenewUntil)
{
AbortWorkItem = orchestrationService.AbandonTaskActivityWorkItemAsync,
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
@@ -94,8 +95,6 @@ Task OnFetchWorkItemAsync(TimeSpan receiveTimeout, Cancell
async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
{
- Task? renewTask = null;
- using var renewCancellationTokenSource = new CancellationTokenSource();
TaskMessage taskMessage = workItem.TaskMessage;
OrchestrationInstance orchestrationInstance = taskMessage.OrchestrationInstance;
@@ -144,14 +143,6 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent);
TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name, scheduledEvent.Version);
- if (workItem.LockedUntilUtc < DateTime.MaxValue)
- {
- // start a task to run RenewUntil
- renewTask = Task.Factory.StartNew(
- () => this.RenewUntil(workItem, renewCancellationTokenSource.Token),
- renewCancellationTokenSource.Token);
- }
-
var dispatchContext = new DispatchMiddlewareContext();
dispatchContext.SetProperty(taskMessage.OrchestrationInstance);
dispatchContext.SetProperty(taskActivity);
@@ -271,23 +262,10 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
finally
{
diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out.
- if (renewTask != null)
- {
- renewCancellationTokenSource.Cancel();
- try
- {
- // wait the renewTask finish
- await renewTask;
- }
- catch (OperationCanceledException)
- {
- // ignore
- }
- }
}
}
- async Task RenewUntil(TaskActivityWorkItem workItem, CancellationToken cancellationToken)
+ internal async Task RenewUntil(TaskActivityWorkItem workItem, CancellationToken cancellationToken)
{
try
{
diff --git a/src/DurableTask.Core/TaskActivityWorkItem.cs b/src/DurableTask.Core/TaskActivityWorkItem.cs
index b73d55d53..2b50ffd1b 100644
--- a/src/DurableTask.Core/TaskActivityWorkItem.cs
+++ b/src/DurableTask.Core/TaskActivityWorkItem.cs
@@ -19,18 +19,13 @@ namespace DurableTask.Core
///
/// An active instance / work item of a task activity
///
- public class TaskActivityWorkItem
+ public class TaskActivityWorkItem : WorkItemBase
{
///
/// The Id of the work work item, likely related to the task message
///
public string Id;
- ///
- /// The datetime this work item is locked until
- ///
- public DateTime LockedUntilUtc;
-
///
/// The task message associated with this work item
///
diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs
index aad78675d..2af66afe3 100644
--- a/src/DurableTask.Core/TaskEntityDispatcher.cs
+++ b/src/DurableTask.Core/TaskEntityDispatcher.cs
@@ -57,12 +57,13 @@ internal TaskEntityDispatcher(
this.errorPropagationMode = errorPropagationMode;
this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!;
this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties;
-
+
this.dispatcher = new WorkItemDispatcher(
"TaskEntityDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
- this.OnProcessWorkItemSessionAsync)
+ this.OnProcessWorkItemSessionAsync,
+ this.RenewWorkItem)
{
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
@@ -203,16 +204,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
- Task renewTask = null;
- using var renewCancellationTokenSource = new CancellationTokenSource();
- if (workItem.LockedUntilUtc < DateTime.MaxValue)
- {
- // start a task to run RenewUntil
- renewTask = Task.Factory.StartNew(
- () => TaskOrchestrationDispatcher.RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskEntityDispatcher), renewCancellationTokenSource.Token),
- renewCancellationTokenSource.Token);
- }
-
WorkItemEffects effects = new WorkItemEffects()
{
ActivityMessages = new List(),
@@ -223,163 +214,141 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
RuntimeState = runtimeState,
};
- try
+ // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
+ if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState(workItem, nameof(TaskEntityDispatcher), this.errorPropagationMode, this.logHelper))
+ {
+ // TODO : mark an orchestration as faulted if there is data corruption
+ this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
+ }
+ else
{
- // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
- if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState(workItem, nameof(TaskEntityDispatcher), this.errorPropagationMode, this.logHelper))
- {
- // TODO : mark an orchestration as faulted if there is data corruption
- this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
- }
- else
- {
- // we start with processing all the requests and figuring out which ones to execute now
- // results can depend on whether the entity is locked, what the maximum batch size is,
- // and whether the messages arrived out of order
+ // we start with processing all the requests and figuring out which ones to execute now
+ // results can depend on whether the entity is locked, what the maximum batch size is,
+ // and whether the messages arrived out of order
- this.DetermineWork(workItem.OrchestrationRuntimeState,
- out SchedulerState schedulerState,
- out Work workToDoNow);
+ this.DetermineWork(workItem.OrchestrationRuntimeState,
+ out SchedulerState schedulerState,
+ out Work workToDoNow);
- if (workToDoNow.OperationCount > 0)
- {
- // execute the user-defined operations on this entity, via the middleware
- var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState);
- var operationResults = result.Results!;
+ if (workToDoNow.OperationCount > 0)
+ {
+ // execute the user-defined operations on this entity, via the middleware
+ var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState);
+ var operationResults = result.Results!;
- // if we encountered an error, record it as the result of the operations
- // so that callers are notified that the operation did not succeed.
- if (result.FailureDetails != null)
+ // if we encountered an error, record it as the result of the operations
+ // so that callers are notified that the operation did not succeed.
+ if (result.FailureDetails != null)
+ {
+ OperationResult errorResult = new OperationResult()
{
- OperationResult errorResult = new OperationResult()
- {
- // for older SDKs only
- Result = result.FailureDetails.ErrorMessage,
- ErrorMessage = "entity dispatch failed",
+ // for older SDKs only
+ Result = result.FailureDetails.ErrorMessage,
+ ErrorMessage = "entity dispatch failed",
- // for newer SDKs only
- FailureDetails = result.FailureDetails,
- };
+ // for newer SDKs only
+ FailureDetails = result.FailureDetails,
+ };
- for (int i = operationResults.Count; i < workToDoNow.OperationCount; i++)
- {
- operationResults.Add(errorResult);
- }
- }
-
- // go through all results
- // for each operation that is not a signal, send a result message back to the calling orchestrator
- for (int i = 0; i < result.Results!.Count; i++)
- {
- var req = workToDoNow.Operations[i];
- if (!req.IsSignal)
- {
- this.SendResultMessage(effects, req, result.Results[i]);
- }
- }
-
- if (result.Results.Count < workToDoNow.OperationCount)
+ for (int i = operationResults.Count; i < workToDoNow.OperationCount; i++)
{
- // some requests were not processed (e.g. due to shutdown or timeout)
- // in this case we just defer the work so it can be retried
- var deferred = workToDoNow.RemoveDeferredWork(result.Results.Count);
- schedulerState.PutBack(deferred);
- workToDoNow.ToBeContinued(schedulerState);
+ operationResults.Add(errorResult);
}
+ }
- // update the entity state based on the result
- schedulerState.EntityState = result.EntityState;
-
- // perform the actions
- foreach (var action in result.Actions!)
+ // go through all results
+ // for each operation that is not a signal, send a result message back to the calling orchestrator
+ for (int i = 0; i < result.Results!.Count; i++)
+ {
+ var req = workToDoNow.Operations[i];
+ if (!req.IsSignal)
{
- switch (action)
- {
- case (SendSignalOperationAction sendSignalAction):
- this.SendSignalMessage(effects, schedulerState, sendSignalAction);
- break;
- case (StartNewOrchestrationOperationAction startAction):
- this.ProcessSendStartMessage(effects, runtimeState, startAction);
- break;
- }
+ this.SendResultMessage(effects, req, result.Results[i]);
}
}
- // process the lock request, if any
- if (workToDoNow.LockRequest != null)
+ if (result.Results.Count < workToDoNow.OperationCount)
{
- this.ProcessLockRequest(effects, schedulerState, workToDoNow.LockRequest);
+ // some requests were not processed (e.g. due to shutdown or timeout)
+ // in this case we just defer the work so it can be retried
+ var deferred = workToDoNow.RemoveDeferredWork(result.Results.Count);
+ schedulerState.PutBack(deferred);
+ workToDoNow.ToBeContinued(schedulerState);
}
- if (workToDoNow.ToBeRescheduled != null)
+ // update the entity state based on the result
+ schedulerState.EntityState = result.EntityState;
+
+ // perform the actions
+ foreach (var action in result.Actions!)
{
- foreach (var request in workToDoNow.ToBeRescheduled)
+ switch (action)
{
- // Reschedule all signals that were received before their time
- this.SendScheduledSelfMessage(effects, request);
+ case (SendSignalOperationAction sendSignalAction):
+ this.SendSignalMessage(effects, schedulerState, sendSignalAction);
+ break;
+ case (StartNewOrchestrationOperationAction startAction):
+ this.ProcessSendStartMessage(effects, runtimeState, startAction);
+ break;
}
}
+ }
+
+ // process the lock request, if any
+ if (workToDoNow.LockRequest != null)
+ {
+ this.ProcessLockRequest(effects, schedulerState, workToDoNow.LockRequest);
+ }
- if (workToDoNow.SuspendAndContinue)
+ if (workToDoNow.ToBeRescheduled != null)
+ {
+ foreach (var request in workToDoNow.ToBeRescheduled)
{
- this.SendContinueSelfMessage(effects);
+ // Reschedule all signals that were received before their time
+ this.SendScheduledSelfMessage(effects, request);
}
+ }
- // this batch is complete. Since this is an entity, we now
- // (always) start a new execution, as in continue-as-new
-
- var serializedSchedulerState = this.SerializeSchedulerStateForNextExecution(schedulerState);
- var nextExecutionStartedEvent = new ExecutionStartedEvent(-1, serializedSchedulerState)
- {
- OrchestrationInstance = new OrchestrationInstance
- {
- InstanceId = workItem.InstanceId,
- ExecutionId = Guid.NewGuid().ToString("N")
- },
- Tags = runtimeState.Tags,
- ParentInstance = runtimeState.ParentInstance,
- Name = runtimeState.Name,
- Version = runtimeState.Version
- };
- var entityStatus = new EntityStatus()
- {
- EntityExists = schedulerState.EntityExists,
- BacklogQueueSize = schedulerState.Queue?.Count ?? 0,
- LockedBy = schedulerState.LockedBy,
- };
- var serializedEntityStatus = JsonConvert.SerializeObject(entityStatus, Serializer.InternalSerializerSettings);
-
- // create the new runtime state for the next execution
- runtimeState = new OrchestrationRuntimeState();
- runtimeState.Status = serializedEntityStatus;
- runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
- runtimeState.AddEvent(nextExecutionStartedEvent);
- runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+ if (workToDoNow.SuspendAndContinue)
+ {
+ this.SendContinueSelfMessage(effects);
}
- }
- finally
- {
- if (renewTask != null)
+
+ // this batch is complete. Since this is an entity, we now
+ // (always) start a new execution, as in continue-as-new
+
+ var serializedSchedulerState = this.SerializeSchedulerStateForNextExecution(schedulerState);
+ var nextExecutionStartedEvent = new ExecutionStartedEvent(-1, serializedSchedulerState)
{
- try
- {
- renewCancellationTokenSource.Cancel();
- await renewTask;
- }
- catch (ObjectDisposedException)
- {
- // ignore
- }
- catch (OperationCanceledException)
+ OrchestrationInstance = new OrchestrationInstance
{
- // ignore
- }
- }
+ InstanceId = workItem.InstanceId,
+ ExecutionId = Guid.NewGuid().ToString("N")
+ },
+ Tags = runtimeState.Tags,
+ ParentInstance = runtimeState.ParentInstance,
+ Name = runtimeState.Name,
+ Version = runtimeState.Version
+ };
+ var entityStatus = new EntityStatus()
+ {
+ EntityExists = schedulerState.EntityExists,
+ BacklogQueueSize = schedulerState.Queue?.Count ?? 0,
+ LockedBy = schedulerState.LockedBy,
+ };
+ var serializedEntityStatus = JsonConvert.SerializeObject(entityStatus, Serializer.InternalSerializerSettings);
+
+ // create the new runtime state for the next execution
+ runtimeState = new OrchestrationRuntimeState();
+ runtimeState.Status = serializedEntityStatus;
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(nextExecutionStartedEvent);
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
}
OrchestrationState instanceState = (runtimeState.ExecutionStartedEvent != null) ?
- instanceState = Utils.BuildOrchestrationState(runtimeState) : null;
+ instanceState = Utils.BuildOrchestrationState(runtimeState) : null;
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
@@ -742,7 +711,7 @@ void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance targ
var message = new ResponseMessage()
{
// content is ignored by receiver but helps with tracing
- Result = ResponseMessage.LockAcquisitionCompletion,
+ Result = ResponseMessage.LockAcquisitionCompletion,
};
this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.ResponseMessageEventName(requestId), message);
}
@@ -871,7 +840,7 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
}
var result = await taskEntity.ExecuteOperationBatchAsync(request);
-
+
dispatchContext.SetProperty(result);
});
@@ -881,5 +850,15 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
return result;
}
+
+ ///
+ /// Renews the work item from the queue.
+ ///
+ ///
+ ///
+ internal Task RenewWorkItem(TaskOrchestrationWorkItem workItem, CancellationToken cancellationToken)
+ {
+ return TaskOrchestrationDispatcher.RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskEntityDispatcher), cancellationToken);
+ }
}
}
diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
index 61864a1a5..10b7555b4 100644
--- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
+++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
@@ -68,7 +68,8 @@ internal TaskOrchestrationDispatcher(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
- this.OnProcessWorkItemSessionAsync)
+ this.OnProcessWorkItemSessionAsync,
+ this.RenewWorkItem)
{
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
@@ -155,7 +156,7 @@ void EnsureExecutionStartedIsFirst(IList batch)
{
// Keep track of orchestrator generation changes, maybe update target position
string executionId = message.OrchestrationInstance.ExecutionId;
- if(previousExecutionId != executionId)
+ if (previousExecutionId != executionId)
{
// We want to re-position the ExecutionStarted event after the "right-most"
// event with a non-null executionID that came before it.
@@ -217,7 +218,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
CorrelationTraceClient.Propagate(
() =>
- {
+ {
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
@@ -305,7 +306,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
var isCompleted = false;
var continuedAsNew = false;
var isInterrupted = false;
-
+
// correlation
CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);
@@ -330,273 +331,240 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent);
OrchestrationState? instanceState = null;
-
- Task? renewTask = null;
- using var renewCancellationTokenSource = new CancellationTokenSource();
- if (workItem.LockedUntilUtc < DateTime.MaxValue)
+ // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
+ if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
{
- // start a task to run RenewUntil
- renewTask = Task.Factory.StartNew(
- () => RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskOrchestrationDispatcher), renewCancellationTokenSource.Token),
- renewCancellationTokenSource.Token);
+ // TODO : mark an orchestration as faulted if there is data corruption
+ this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
+ TraceHelper.TraceSession(
+ TraceEventType.Error,
+ "TaskOrchestrationDispatcher-DeletedOrchestration",
+ runtimeState.OrchestrationInstance?.InstanceId!,
+ "Received work-item for an invalid orchestration");
+ isCompleted = true;
+ traceActivity?.Dispose();
}
-
- try
+ else
{
- // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
- if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
+ do
{
- // TODO : mark an orchestration as faulted if there is data corruption
- this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
- TraceHelper.TraceSession(
- TraceEventType.Error,
- "TaskOrchestrationDispatcher-DeletedOrchestration",
- runtimeState.OrchestrationInstance?.InstanceId!,
- "Received work-item for an invalid orchestration");
- isCompleted = true;
- traceActivity?.Dispose();
- }
- else
- {
- do
- {
- continuedAsNew = false;
- continuedAsNewMessage = null;
+ continuedAsNew = false;
+ continuedAsNewMessage = null;
- this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
- TraceHelper.TraceInstance(
- TraceEventType.Verbose,
- "TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin",
- runtimeState.OrchestrationInstance!,
- "Executing user orchestration: {0}",
- JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
+ this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
+ TraceHelper.TraceInstance(
+ TraceEventType.Verbose,
+ "TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin",
+ runtimeState.OrchestrationInstance!,
+ "Executing user orchestration: {0}",
+ JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
- if (workItem.Cursor == null)
- {
- workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
- }
- else
- {
- await this.ResumeOrchestrationAsync(workItem);
- }
+ if (workItem.Cursor == null)
+ {
+ workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
+ }
+ else
+ {
+ await this.ResumeOrchestrationAsync(workItem);
+ }
- IReadOnlyList decisions = workItem.Cursor.LatestDecisions.ToList();
+ IReadOnlyList decisions = workItem.Cursor.LatestDecisions.ToList();
- this.logHelper.OrchestrationExecuted(
- runtimeState.OrchestrationInstance!,
- runtimeState.Name,
- decisions);
+ this.logHelper.OrchestrationExecuted(
+ runtimeState.OrchestrationInstance!,
+ runtimeState.Name,
+ decisions);
+ TraceHelper.TraceInstance(
+ TraceEventType.Information,
+ "TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
+ runtimeState.OrchestrationInstance!,
+ "Executed user orchestration. Received {0} orchestrator actions: {1}",
+ decisions.Count,
+ string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType)));
+
+ // TODO: Exception handling for invalid decisions, which is increasingly likely
+ // when custom middleware is involved (e.g. out-of-process scenarios).
+ foreach (OrchestratorAction decision in decisions)
+ {
TraceHelper.TraceInstance(
TraceEventType.Information,
- "TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
+ "TaskOrchestrationDispatcher-ProcessOrchestratorAction",
runtimeState.OrchestrationInstance!,
- "Executed user orchestration. Received {0} orchestrator actions: {1}",
- decisions.Count,
- string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType)));
-
- // TODO: Exception handling for invalid decisions, which is increasingly likely
- // when custom middleware is involved (e.g. out-of-process scenarios).
- foreach (OrchestratorAction decision in decisions)
+ "Processing orchestrator action of type {0}",
+ decision.OrchestratorActionType);
+ switch (decision.OrchestratorActionType)
{
- TraceHelper.TraceInstance(
- TraceEventType.Information,
- "TaskOrchestrationDispatcher-ProcessOrchestratorAction",
- runtimeState.OrchestrationInstance!,
- "Processing orchestrator action of type {0}",
- decision.OrchestratorActionType);
- switch (decision.OrchestratorActionType)
- {
- case OrchestratorActionType.ScheduleOrchestrator:
- var scheduleTaskAction = (ScheduleTaskOrchestratorAction)decision;
- var message = this.ProcessScheduleTaskDecision(
- scheduleTaskAction,
+ case OrchestratorActionType.ScheduleOrchestrator:
+ var scheduleTaskAction = (ScheduleTaskOrchestratorAction)decision;
+ var message = this.ProcessScheduleTaskDecision(
+ scheduleTaskAction,
+ runtimeState,
+ this.IncludeParameters,
+ traceActivity);
+ messagesToSend.Add(message);
+ break;
+ case OrchestratorActionType.CreateTimer:
+ var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision;
+ timerMessages.Add(this.ProcessCreateTimerDecision(
+ timerOrchestratorAction,
+ runtimeState,
+ isInternal: false));
+ break;
+ case OrchestratorActionType.CreateSubOrchestration:
+ var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision;
+ orchestratorMessages.Add(
+ this.ProcessCreateSubOrchestrationInstanceDecision(
+ createSubOrchestrationAction,
runtimeState,
this.IncludeParameters,
- traceActivity);
- messagesToSend.Add(message);
- break;
- case OrchestratorActionType.CreateTimer:
- var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision;
- timerMessages.Add(this.ProcessCreateTimerDecision(
- timerOrchestratorAction,
- runtimeState,
- isInternal: false));
- break;
- case OrchestratorActionType.CreateSubOrchestration:
- var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision;
- orchestratorMessages.Add(
- this.ProcessCreateSubOrchestrationInstanceDecision(
- createSubOrchestrationAction,
- runtimeState,
- this.IncludeParameters,
- traceActivity));
- break;
- case OrchestratorActionType.SendEvent:
- var sendEventAction = (SendEventOrchestratorAction)decision;
- orchestratorMessages.Add(
- this.ProcessSendEventDecision(sendEventAction, runtimeState));
- break;
- case OrchestratorActionType.OrchestrationComplete:
- OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision;
- TaskMessage? workflowInstanceCompletedMessage =
- this.ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, this.IncludeDetails, out continuedAsNew);
- if (workflowInstanceCompletedMessage != null)
+ traceActivity));
+ break;
+ case OrchestratorActionType.SendEvent:
+ var sendEventAction = (SendEventOrchestratorAction)decision;
+ orchestratorMessages.Add(
+ this.ProcessSendEventDecision(sendEventAction, runtimeState));
+ break;
+ case OrchestratorActionType.OrchestrationComplete:
+ OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision;
+ TaskMessage? workflowInstanceCompletedMessage =
+ this.ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, this.IncludeDetails, out continuedAsNew);
+ if (workflowInstanceCompletedMessage != null)
+ {
+ // Send complete message to parent workflow or to itself to start a new execution
+ // Store the event so we can rebuild the state
+ carryOverEvents = null;
+ if (continuedAsNew)
{
- // Send complete message to parent workflow or to itself to start a new execution
- // Store the event so we can rebuild the state
- carryOverEvents = null;
- if (continuedAsNew)
+ continuedAsNewMessage = workflowInstanceCompletedMessage;
+ continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent;
+ if (completeDecision.CarryoverEvents.Any())
{
- continuedAsNewMessage = workflowInstanceCompletedMessage;
- continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent;
- if (completeDecision.CarryoverEvents.Any())
- {
- carryOverEvents = completeDecision.CarryoverEvents.ToList();
- completeDecision.CarryoverEvents.Clear();
- }
- }
- else
- {
- orchestratorMessages.Add(workflowInstanceCompletedMessage);
+ carryOverEvents = completeDecision.CarryoverEvents.ToList();
+ completeDecision.CarryoverEvents.Clear();
}
}
+ else
+ {
+ orchestratorMessages.Add(workflowInstanceCompletedMessage);
+ }
+ }
- isCompleted = !continuedAsNew;
- break;
- default:
- throw TraceHelper.TraceExceptionInstance(
- TraceEventType.Error,
- "TaskOrchestrationDispatcher-UnsupportedDecisionType",
- runtimeState.OrchestrationInstance!,
- new NotSupportedException($"Decision type '{decision.OrchestratorActionType}' not supported"));
- }
+ isCompleted = !continuedAsNew;
+ break;
+ default:
+ throw TraceHelper.TraceExceptionInstance(
+ TraceEventType.Error,
+ "TaskOrchestrationDispatcher-UnsupportedDecisionType",
+ runtimeState.OrchestrationInstance!,
+ new NotSupportedException($"Decision type '{decision.OrchestratorActionType}' not supported"));
+ }
- // Underlying orchestration service provider may have a limit of messages per call, to avoid the situation
- // we keep on asking the provider if message count is ok and stop processing new decisions if not.
- //
- // We also put in a fake timer to force next orchestration task for remaining messages
- int totalMessages = messagesToSend.Count + orchestratorMessages.Count + timerMessages.Count;
- if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
+ // Underlying orchestration service provider may have a limit of messages per call, to avoid the situation
+ // we keep on asking the provider if message count is ok and stop processing new decisions if not.
+ //
+ // We also put in a fake timer to force next orchestration task for remaining messages
+ int totalMessages = messagesToSend.Count + orchestratorMessages.Count + timerMessages.Count;
+ if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
+ {
+ TraceHelper.TraceInstance(
+ TraceEventType.Information,
+ "TaskOrchestrationDispatcher-MaxMessageCountReached",
+ runtimeState.OrchestrationInstance!,
+ "MaxMessageCount reached. Adding timer to process remaining events in next attempt.");
+
+ if (isCompleted || continuedAsNew)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
- "TaskOrchestrationDispatcher-MaxMessageCountReached",
+ "TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted",
runtimeState.OrchestrationInstance!,
- "MaxMessageCount reached. Adding timer to process remaining events in next attempt.");
-
- if (isCompleted || continuedAsNew)
- {
- TraceHelper.TraceInstance(
- TraceEventType.Information,
- "TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted",
- runtimeState.OrchestrationInstance!,
- "Orchestration already completed. Skip adding timer for splitting messages.");
- break;
- }
-
- var dummyTimer = new CreateTimerOrchestratorAction
- {
- Id = FrameworkConstants.FakeTimerIdToSplitDecision,
- FireAt = DateTime.UtcNow
- };
-
- timerMessages.Add(this.ProcessCreateTimerDecision(
- dummyTimer,
- runtimeState,
- isInternal: true));
- isInterrupted = true;
+ "Orchestration already completed. Skip adding timer for splitting messages.");
break;
}
+
+ var dummyTimer = new CreateTimerOrchestratorAction
+ {
+ Id = FrameworkConstants.FakeTimerIdToSplitDecision,
+ FireAt = DateTime.UtcNow
+ };
+
+ timerMessages.Add(this.ProcessCreateTimerDecision(
+ dummyTimer,
+ runtimeState,
+ isInternal: true));
+ isInterrupted = true;
+ break;
}
+ }
- // correlation
- CorrelationTraceClient.Propagate(() =>
- {
- if (runtimeState.ExecutionStartedEvent != null)
- runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
- });
+ // correlation
+ CorrelationTraceClient.Propagate(() =>
+ {
+ if (runtimeState.ExecutionStartedEvent != null)
+ runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
+ });
- // finish up processing of the work item
- if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
- {
- runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
- }
+ // finish up processing of the work item
+ if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
+ {
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+ }
- if (isCompleted)
+ if (isCompleted)
+ {
+ TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state");
+ if (runtimeState.ExecutionStartedEvent != null)
{
- TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state");
- if (runtimeState.ExecutionStartedEvent != null)
- {
- instanceState = Utils.BuildOrchestrationState(runtimeState);
- }
+ instanceState = Utils.BuildOrchestrationState(runtimeState);
}
- else
+ }
+ else
+ {
+ if (continuedAsNew)
{
- if (continuedAsNew)
- {
- TraceHelper.TraceSession(
- TraceEventType.Information,
- "TaskOrchestrationDispatcher-UpdatingStateForContinuation",
- workItem.InstanceId,
- "Updating state for continuation");
+ TraceHelper.TraceSession(
+ TraceEventType.Information,
+ "TaskOrchestrationDispatcher-UpdatingStateForContinuation",
+ workItem.InstanceId,
+ "Updating state for continuation");
- // correlation
- CorrelationTraceClient.Propagate(() =>
- {
- continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
- });
+ // correlation
+ CorrelationTraceClient.Propagate(() =>
+ {
+ continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
+ });
- // Copy the distributed trace context, if any
- continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
+ // Copy the distributed trace context, if any
+ continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
- runtimeState = new OrchestrationRuntimeState();
- runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
- runtimeState.AddEvent(continueAsNewExecutionStarted!);
- runtimeState.Status = workItem.OrchestrationRuntimeState.Status ?? carryOverStatus;
- carryOverStatus = workItem.OrchestrationRuntimeState.Status;
+ runtimeState = new OrchestrationRuntimeState();
+ runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
+ runtimeState.AddEvent(continueAsNewExecutionStarted!);
+ runtimeState.Status = workItem.OrchestrationRuntimeState.Status ?? carryOverStatus;
+ carryOverStatus = workItem.OrchestrationRuntimeState.Status;
- if (carryOverEvents != null)
+ if (carryOverEvents != null)
+ {
+ foreach (var historyEvent in carryOverEvents)
{
- foreach (var historyEvent in carryOverEvents)
- {
- runtimeState.AddEvent(historyEvent);
- }
+ runtimeState.AddEvent(historyEvent);
}
-
- runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
- workItem.OrchestrationRuntimeState = runtimeState;
-
- workItem.Cursor = null;
}
- instanceState = Utils.BuildOrchestrationState(runtimeState);
+ runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
+ workItem.OrchestrationRuntimeState = runtimeState;
+
+ workItem.Cursor = null;
}
- } while (continuedAsNew);
- }
- }
- finally
- {
- if (renewTask != null)
- {
- try
- {
- renewCancellationTokenSource.Cancel();
- await renewTask;
+ instanceState = Utils.BuildOrchestrationState(runtimeState);
}
- catch (ObjectDisposedException)
- {
- // ignore
- }
- catch (OperationCanceledException)
- {
- // ignore
- }
- }
+ } while (continuedAsNew);
}
+
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
// some backends expect the original runtime state object
@@ -618,7 +586,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);
-
+
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
@@ -627,6 +595,16 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
return isCompleted || continuedAsNew || isInterrupted;
}
+ ///
+ /// Renews the work item from the queue.
+ ///
+ ///
+ ///
+ internal Task RenewWorkItem(TaskOrchestrationWorkItem workItem, CancellationToken cancellationToken)
+ {
+ return RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskOrchestrationDispatcher), cancellationToken);
+ }
+
static OrchestrationExecutionContext GetOrchestrationExecutionContext(OrchestrationRuntimeState runtimeState)
{
return new OrchestrationExecutionContext { OrchestrationTags = runtimeState.Tags ?? new Dictionary(capacity: 0) };
@@ -635,6 +613,7 @@ static OrchestrationExecutionContext GetOrchestrationExecutionContext(Orchestrat
static TimeSpan MinRenewalInterval = TimeSpan.FromSeconds(5); // prevents excessive retries if clocks are off
static TimeSpan MaxRenewalInterval = TimeSpan.FromSeconds(30);
+
internal static async Task RenewUntil(TaskOrchestrationWorkItem workItem, IOrchestrationService orchestrationService, LogHelper logHelper, string dispatcher, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
@@ -1143,11 +1122,11 @@ TaskMessage ProcessSendEventDecision(
{
var historyEvent = new EventSentEvent(sendEventAction.Id)
{
- InstanceId = sendEventAction.Instance?.InstanceId,
- Name = sendEventAction.EventName,
- Input = sendEventAction.EventData
+ InstanceId = sendEventAction.Instance?.InstanceId,
+ Name = sendEventAction.EventName,
+ Input = sendEventAction.EventData
};
-
+
runtimeState.AddEvent(historyEvent);
EventRaisedEvent eventRaisedEvent = new EventRaisedEvent(-1, sendEventAction.EventData)
@@ -1169,7 +1148,7 @@ TaskMessage ProcessSendEventDecision(
Event = eventRaisedEvent
};
}
-
+
internal class NonBlockingCountdownLock
{
int available;
diff --git a/src/DurableTask.Core/TaskOrchestrationWorkItem.cs b/src/DurableTask.Core/TaskOrchestrationWorkItem.cs
index 6e5c11344..ffae5159c 100644
--- a/src/DurableTask.Core/TaskOrchestrationWorkItem.cs
+++ b/src/DurableTask.Core/TaskOrchestrationWorkItem.cs
@@ -20,7 +20,7 @@ namespace DurableTask.Core
///
/// An active instance / work item of an orchestration
///
- public class TaskOrchestrationWorkItem
+ public class TaskOrchestrationWorkItem : WorkItemBase
{
///
/// The instance id of this orchestration
@@ -32,11 +32,6 @@ public class TaskOrchestrationWorkItem
///
public OrchestrationRuntimeState OrchestrationRuntimeState;
- ///
- /// The datetime this orchestration work item is locked until
- ///
- public DateTime LockedUntilUtc;
-
///
/// The list of new task messages associated with this work item instance
///
@@ -48,11 +43,6 @@ public class TaskOrchestrationWorkItem
///
public IOrchestrationSession Session;
- ///
- /// The trace context used for correlation.
- ///
- public TraceContextBase TraceContext;
-
///
/// The flag of extendedSession.
///
diff --git a/src/DurableTask.Core/TrackingWorkItem.cs b/src/DurableTask.Core/TrackingWorkItem.cs
index 52982d0a8..bf9e1272a 100644
--- a/src/DurableTask.Core/TrackingWorkItem.cs
+++ b/src/DurableTask.Core/TrackingWorkItem.cs
@@ -19,18 +19,13 @@ namespace DurableTask.Core
///
/// An active tracking work item
///
- public class TrackingWorkItem
+ public class TrackingWorkItem : WorkItemBase
{
///
/// The instance id of this tracking work item
///
public string InstanceId;
- ///
- /// The datetime this work item is locked until
- ///
- public DateTime LockedUntilUtc;
-
///
/// The list of new messages to process tracking for
///
diff --git a/src/DurableTask.Core/WorkItemBase.cs b/src/DurableTask.Core/WorkItemBase.cs
new file mode 100644
index 000000000..93775c20e
--- /dev/null
+++ b/src/DurableTask.Core/WorkItemBase.cs
@@ -0,0 +1,20 @@
+using System;
+
+namespace DurableTask.Core
+{
+ ///
+ /// Base class for Work Items.
+ ///
+ public abstract class WorkItemBase
+ {
+ ///
+ /// The datetime this work item is locked until
+ ///
+ public DateTime LockedUntilUtc;
+
+ ///
+ /// The trace context used for correlation.
+ ///
+ public TraceContextBase TraceContext;
+ }
+}
diff --git a/src/DurableTask.Core/WorkItemDispatcher.cs b/src/DurableTask.Core/WorkItemDispatcher.cs
index 3829578d8..97cab60ab 100644
--- a/src/DurableTask.Core/WorkItemDispatcher.cs
+++ b/src/DurableTask.Core/WorkItemDispatcher.cs
@@ -27,6 +27,7 @@ namespace DurableTask.Core
///
/// The typed Object to dispatch
public class WorkItemDispatcher : IDisposable
+ where T: WorkItemBase
{
const int DefaultMaxConcurrentWorkItems = 20;
const int DefaultDispatcherCount = 1;
@@ -64,7 +65,8 @@ public class WorkItemDispatcher : IDisposable
Func> FetchWorkItem { get; }
Func ProcessWorkItem { get; }
-
+ Func RenewWorkItem { get; }
+
///
/// Method to execute for safely releasing a work item
///
@@ -92,20 +94,23 @@ public class WorkItemDispatcher : IDisposable
/// Creates a new Work Item Dispatcher with given name and identifier method
///
/// Name identifying this dispatcher for logging and diagnostics
- ///
- ///
- ///
+ /// Identifier for the WorkItem
+ /// Action to fetch the work item.
+ /// Action to process the work item.
+ /// Action to renew the work item to maintain locking.
public WorkItemDispatcher(
- string name,
+ string name,
Func workItemIdentifier,
Func> fetchWorkItem,
- Func processWorkItem)
+ Func processWorkItem,
+ Func renewWorkItem)
{
this.name = name;
this.id = Guid.NewGuid().ToString("N");
this.workItemIdentifier = workItemIdentifier ?? throw new ArgumentNullException(nameof(workItemIdentifier));
this.FetchWorkItem = fetchWorkItem ?? throw new ArgumentNullException(nameof(fetchWorkItem));
this.ProcessWorkItem = processWorkItem ?? throw new ArgumentNullException(nameof(processWorkItem));
+ this.RenewWorkItem = renewWorkItem ?? throw new ArgumentNullException(nameof(renewWorkItem));
}
///
@@ -141,9 +146,9 @@ public async Task StartAsync()
this.LogHelper.DispatcherStarting(context);
// We just want this to Run we intentionally don't wait
- #pragma warning disable 4014
+#pragma warning disable 4014
Task.Run(() => this.DispatchAsync(context));
- #pragma warning restore 4014
+#pragma warning restore 4014
}
}
finally
@@ -238,7 +243,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
-
+
logThrottle = false;
}
@@ -249,18 +254,28 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
var delaySecs = 0;
T workItem = default(T);
+ Task renewTask =Task.CompletedTask;
+ using var renewCancellationTokenSource = new CancellationTokenSource();
+
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
- TraceEventType.Verbose,
+ TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
Stopwatch timer = Stopwatch.StartNew();
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
+ if (workItem.LockedUntilUtc < DateTime.MaxValue)
+ {
+ // start a task to run Renewal of Work Items.
+ renewTask = Task.Factory.StartNew(
+ () => this.RenewWorkItem(workItem, renewCancellationTokenSource.Token));
+ }
+
if (!IsNull(workItem))
{
string workItemId = this.workItemIdentifier(workItem);
@@ -273,7 +288,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
}
TraceHelper.Trace(
- TraceEventType.Verbose,
+ TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
@@ -294,7 +309,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
if (!this.isStarted)
{
TraceHelper.Trace(
- TraceEventType.Information,
+ TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
}
@@ -303,8 +318,8 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
- TraceEventType.Warning,
- "WorkItemDispatcherDispatch-Exception",
+ TraceEventType.Warning,
+ "WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
@@ -329,9 +344,30 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
- #pragma warning disable 4014
- Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
- #pragma warning restore 4014
+#pragma warning disable 4014
+ Task.Run(async () => {
+ try
+ {
+ this.ProcessWorkItemAsync(context, workItem);
+ }
+ finally
+ {
+ try
+ {
+ renewCancellationTokenSource.Cancel();
+ await renewTask;
+ }
+ catch (ObjectDisposedException)
+ {
+ // ignore
+ }
+ catch (OperationCanceledException)
+ {
+ // ignore
+ }
+ }
+ });
+#pragma warning restore 4014
scheduledWorkItem = true;
}
@@ -356,7 +392,7 @@ async Task DispatchAsync(WorkItemDispatcherContext context)
async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workItemObj)
{
- var workItem = (T) workItemObj;
+ var workItem = (T)workItemObj;
var abortWorkItem = true;
string workItemId = string.Empty;
@@ -366,7 +402,7 @@ async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workIt
this.LogHelper.ProcessWorkItemStarting(context, workItemId);
TraceHelper.Trace(
- TraceEventType.Information,
+ TraceEventType.Information,
"WorkItemDispatcherProcess-Begin",
this.GetFormattedLog(context.DispatcherId, $"Starting to process workItem {workItemId}"));
@@ -376,7 +412,7 @@ async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workIt
this.LogHelper.ProcessWorkItemCompleted(context, workItemId);
TraceHelper.Trace(
- TraceEventType.Information,
+ TraceEventType.Information,
"WorkItemDispatcherProcess-End",
this.GetFormattedLog(context.DispatcherId, $"Finished processing workItem {workItemId}"));
@@ -390,12 +426,12 @@ async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workIt
$"Backing off for {BackOffIntervalOnInvalidOperationSecs} seconds",
exception);
TraceHelper.TraceException(
- TraceEventType.Error,
- "WorkItemDispatcherProcess-TypeMissingException",
+ TraceEventType.Error,
+ "WorkItemDispatcherProcess-TypeMissingException",
exception,
this.GetFormattedLog(context.DispatcherId, $"Exception while processing workItem {workItemId}"));
TraceHelper.Trace(
- TraceEventType.Error,
+ TraceEventType.Error,
"WorkItemDispatcherProcess-TypeMissingBackingOff",
"Backing off after invalid operation by " + BackOffIntervalOnInvalidOperationSecs);
@@ -405,8 +441,8 @@ async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workIt
catch (Exception exception) when (!Utils.IsFatal(exception))
{
TraceHelper.TraceException(
- TraceEventType.Error,
- "WorkItemDispatcherProcess-Exception",
+ TraceEventType.Error,
+ "WorkItemDispatcherProcess-Exception",
exception,
this.GetFormattedLog(context.DispatcherId, $"Exception while processing workItem {workItemId}"));
@@ -419,7 +455,7 @@ async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workIt
$"Backing off for {delayInSecs} seconds until {CountDownToZeroDelay} successful operations",
exception);
TraceHelper.Trace(
- TraceEventType.Error,
+ TraceEventType.Error,
"WorkItemDispatcherProcess-BackingOff",
"Backing off after exception by at least " + delayInSecs + " until " + CountDownToZeroDelay +
" successful operations");
diff --git a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs
index 3d49bb749..54445826b 100644
--- a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs
+++ b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs
@@ -221,7 +221,8 @@ public ServiceBusOrchestrationService(
"TrackingDispatcher",
item => item == null ? string.Empty : item.InstanceId,
FetchTrackingWorkItemAsync,
- ProcessTrackingWorkItemAsync)
+ ProcessTrackingWorkItemAsync,
+ (T,token) => Task.CompletedTask)
{
GetDelayInSecondsAfterOnFetchException = GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = GetDelayInSecondsAfterOnProcessException,