Skip to content

Commit

Permalink
[main] Fix race in KernelScheduler (#3232)
Browse files Browse the repository at this point in the history
* Use KernelCommandResult.Events in place of Kernel.Events.

* Fix race.

* Enable more complete logging for commands and events in tests.

* Add a test.
  • Loading branch information
shyamnamboodiripad authored Oct 5, 2023
1 parent 6cdb9c8 commit f2763eb
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
31 changes: 27 additions & 4 deletions src/Microsoft.DotNet.Interactive.Tests/DirectiveTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.DotNet.Interactive.Commands;
using Microsoft.DotNet.Interactive.CSharp;
using Microsoft.DotNet.Interactive.Events;
using Microsoft.DotNet.Interactive.Formatting;
Expand Down Expand Up @@ -216,9 +215,33 @@ public void Directives_with_duplicate_aliases_are_not_allowed()
[Fact]
public async Task OnComplete_can_be_used_to_act_on_completion_of_commands()
{
var onCompleteWasCalled = false;
using var kernel = new FakeKernel();

using var events = kernel.KernelEvents.ToSubscribedList();
kernel.AddDirective(new Command("#!wrap")
{
Handler = CommandHandler.Create((InvocationContext ctx) =>
{
var c = ctx.GetService<KernelInvocationContext>();

c.OnComplete(context =>
{
onCompleteWasCalled = true;
});

return Task.CompletedTask;
})
});

await kernel.SubmitCodeAsync("#!wrap");

onCompleteWasCalled.Should().BeTrue();
}

[Fact]
public async Task OnComplete_can_be_used_to_publish_events_before_context_is_completed()
{
using var kernel = new FakeKernel();

kernel.AddDirective(new Command("#!wrap")
{
Expand All @@ -236,9 +259,9 @@ public async Task OnComplete_can_be_used_to_act_on_completion_of_commands()
})
});

await kernel.SubmitCodeAsync("#!wrap");
var result = await kernel.SubmitCodeAsync("#!wrap");

events
result.Events
.OfType<DisplayedValueProduced>()
.Select(e => e.Value)
.Should()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
</ItemGroup>

<ItemGroup>
<Compile Include="..\dotnet-interactive\Pocket\Format.CustomizeLogString.cs" Link="Pocket\Format.CustomizeLogString.cs" />
<Compile Include="..\dotnet-interactive\Utility\KernelDiagnostics.cs" Link="Utility\KernelDiagnostics.cs" />
<Compile Include="..\Microsoft.DotNet.Interactive.CSharpProject\%28Recipes%29\AsyncLazy{T}.cs" Link="Utility\AsyncLazy{T}.cs" />
<Compile Include="..\Microsoft.DotNet.Interactive.Jupyter\(Recipes)\BareObjectConverter.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</ItemGroup>

<ItemGroup>
<Compile Include="..\dotnet-interactive\Pocket\Format.CustomizeLogString.cs" Link="Pocket\Format.CustomizeLogString.cs" />
<Compile Include="..\Microsoft.DotNet.Interactive.Tests\Utility\ConnectHost.cs" Link="Utility\ConnectHost.cs" />
<Compile Include="..\Microsoft.DotNet.Interactive.Tests\Utility\FactSkipLinux.cs" Link="Utility\FactSkipLinux.cs" />
<Compile Include="..\Microsoft.DotNet.Interactive.Tests\Utility\FactSkipNetFramework.cs" Link="Utility\FactSkipNetFramework.cs" />
Expand Down
43 changes: 29 additions & 14 deletions src/Microsoft.DotNet.Interactive/KernelScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,11 @@ public Task<TResult> RunAsync(

internal async Task IdleAsync()
{
if (_currentlyRunningTopLevelOperation is {} currentlyRunning &&
!currentlyRunning.TaskCompletionSource.Task.IsCompleted)
if (_currentlyRunningTopLevelOperation is { } currentlyRunning && !currentlyRunning.IsCompleted)
{
await currentlyRunning.TaskCompletionSource.Task;
}

_childOperationsBarrier.SignalAndWait();
}

Expand All @@ -111,7 +110,7 @@ private void ScheduledOperationRunLoop(object _)
_currentlyRunningTopLevelOperation = operation;

var executionContext = operation.ExecutionContext;

try
{
ExecutionContext.Run(
Expand All @@ -125,10 +124,6 @@ private void ScheduledOperationRunLoop(object _)
{
Log.Error("while executing {operation}", e, operation);
}
finally
{
_currentlyRunningTopLevelOperation = default;
}
}
}

Expand All @@ -144,15 +139,15 @@ private void Run(ScheduledOperation operation)
.ExecuteAsync()
.ContinueWith(t =>
{
if (!operation.TaskCompletionSource.Task.IsCompleted)
if (!operation.IsCompleted)
{
if (t.GetIsCompletedSuccessfully())
{
operation.TaskCompletionSource.TrySetResult(t.Result);
CompleteWithResult(t.Result);
}
else if (t.Exception is { })
{
operation.TaskCompletionSource.SetException(t.Exception);
CompleteWithException(t.Exception);
}
}
});
Expand All @@ -167,13 +162,31 @@ private void Run(ScheduledOperation operation)
}
catch (Exception exception)
{
if (!operation.TaskCompletionSource.Task.IsCompleted)
if (!operation.IsCompleted)
{
operation.TaskCompletionSource.SetException(exception);
CompleteWithException(exception);
}
}
finally

void CompleteWithResult(TResult result)
{
ResetCurrentlyRunning();
operation.TaskCompletionSource.TrySetResult(result);
}

void CompleteWithException(Exception exception)
{
ResetCurrentlyRunning();
operation.TaskCompletionSource.SetException(exception);
}

void ResetCurrentlyRunning()
{
if (ReferenceEquals(_currentlyRunningTopLevelOperation, _currentlyRunningOperation))
{
_currentlyRunningTopLevelOperation = null;
}

_currentlyRunningOperation = null;
}
}
Expand Down Expand Up @@ -344,6 +357,8 @@ public ScheduledOperation(

public T Value { get; }

public bool IsCompleted => TaskCompletionSource.Task.IsCompleted;

public bool IsDeferred { get; }

public bool IsChildOperation { get; }
Expand Down

0 comments on commit f2763eb

Please sign in to comment.