Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
exception handling for code in scheduled work


remove blank lines
  • Loading branch information
colombod committed Feb 23, 2021
1 parent 87ff15f commit ab93a75
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 188 deletions.
306 changes: 136 additions & 170 deletions src/Microsoft.DotNet.Interactive.Tests/KernelCommandSchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;

using FluentAssertions;

using Microsoft.DotNet.Interactive.Tests.Utility;

using Pocket;
Expand Down Expand Up @@ -75,21 +76,21 @@ public async Task scheduled_work_does_not_execute_in_parallel()
var concurrencyCounter = 0;
var maxObservedParallelism = 0;
var tasks = new Task[3];

for (var i = 0; i < 3; i++)
{
var task = scheduler.Schedule(i, async _ =>
var task = scheduler.Schedule(i, async _ =>
{
Interlocked.Increment(ref concurrencyCounter);

await Task.Delay(100);
maxObservedParallelism = Math.Max(concurrencyCounter, maxObservedParallelism);

Interlocked.Decrement(ref concurrencyCounter);
} );
});
tasks[i] = task;
}

await Task.WhenAll(tasks);

maxObservedParallelism.Should().Be(1);
Expand All @@ -107,7 +108,7 @@ void PerformWork(int v)

using var scheduler = new KernelScheduler<int, int>();
scheduler.RegisterDeferredOperationSource(
(v,_) => Enumerable.Repeat(v * 10, v), PerformWork);
(v, _) => Enumerable.Repeat(v * 10, v), PerformWork);

for (var i = 1; i <= 3; i++)
{
Expand Down Expand Up @@ -144,6 +145,132 @@ void PerformWork(int v)
executionList.Should().BeEquivalentTo(1);
}

[Fact]
public async Task cancelled_work_prevents_any_scheduled_work_from_executing()
{
var executionList = new List<int>();
using var scheduler = new KernelScheduler<int, int>();
var barrier = new Barrier(2);

async Task PerformWork(int v)
{
barrier.SignalAndWait();
await Task.Delay(3000);
executionList.Add(v);
}

var scheduledWork = new List<Task>
{
scheduler.Schedule(1, PerformWork),
scheduler.Schedule(2, executionList.Add),
scheduler.Schedule(3, executionList.Add)
};

barrier.SignalAndWait();
scheduler.Cancel();
try
{
await Task.WhenAll(scheduledWork);
}
catch (TaskCanceledException)
{

}

executionList.Should().BeEmpty();
}

[Fact]
public void cancelling_work_throws_exception()
{
var executionList = new List<int>();
using var scheduler = new KernelScheduler<int, int>();
var barrier = new Barrier(2);

async Task PerformWork(int v)
{
barrier.SignalAndWait();
await Task.Delay(3000);
executionList.Add(v);
}

var scheduledWork = new List<Task>
{
scheduler.Schedule(1, PerformWork),
scheduler.Schedule(2, executionList.Add),
scheduler.Schedule(3, executionList.Add)
};

barrier.SignalAndWait();
scheduler.Cancel();
var operation = new Action( () => Task.WhenAll(scheduledWork).Wait(5000));

operation.Should().Throw<TaskCanceledException>();
}

[Fact]
public async Task exception_in_scheduled_work_halts_execution()
{
var executionList = new List<int>();
using var scheduler = new KernelScheduler<int, int>();
var barrier = new Barrier(2);

void PerformWork(int v)
{
barrier.SignalAndWait();
throw new InvalidOperationException("test exception");
}

var scheduledWork = new List<Task>
{
scheduler.Schedule(1, PerformWork),
scheduler.Schedule(2, executionList.Add),
scheduler.Schedule(3, executionList.Add)
};

barrier.SignalAndWait();
try
{
await Task.WhenAll(scheduledWork);
}
catch(InvalidOperationException)
{

}

executionList.Should().BeEmpty();
}

[Fact]
public void exception_in_scheduled_work_is_propagated()
{
var executionList = new List<int>();
using var scheduler = new KernelScheduler<int, int>();
var barrier = new Barrier(2);

void PerformWork(int v)
{
barrier.SignalAndWait();
throw new InvalidOperationException("test exception");
}

var scheduledWork = new List<Task>
{
scheduler.Schedule(1, PerformWork),
scheduler.Schedule(2, executionList.Add),
scheduler.Schedule(3, executionList.Add)
};

barrier.SignalAndWait();
var operation = new Action(() => Task.WhenAll(scheduledWork).Wait(5000));

operation.Should().Throw<InvalidOperationException>()
.Which
.Message
.Should()
.Be("test exception");
}

[Fact]
public async Task awaiting_for_work_to_complete_does_not_wait_for_subsequent_work()
{
Expand All @@ -162,9 +289,9 @@ async Task PerformWorkAsync(int v)

_ = scheduler.Schedule(3, PerformWorkAsync);

executionList.Should().BeEquivalentSequenceTo( 1, 2);
executionList.Should().BeEquivalentSequenceTo(1, 2);



}

[Fact]
Expand All @@ -186,168 +313,7 @@ void PerformWork(int v)
await scheduler.Schedule(i, PerformWork, $"scope{i}");
}

executionList.Should().BeEquivalentSequenceTo( 1, 20, 20, 2,3);
executionList.Should().BeEquivalentSequenceTo(1, 20, 20, 2, 3);
}

//[Fact]
//public async Task command_execute_on_kernel_specified_at_scheduling_time()
//{
// var commandsHandledOnKernel1 = new List<KernelCommand>();
// var commandsHandledOnKernel2 = new List<KernelCommand>();

// var scheduler = new KernelCommandScheduler();

// var kernel1 = new FakeKernel("kernel1")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel1.Add(command);
// return Task.CompletedTask;
// }
// };
// var kernel2 = new FakeKernel("kernel2")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel2.Add(command);
// return Task.CompletedTask;
// }
// };

// var command1 = new SubmitCode("for kernel 1", kernel1.Name);
// var command2 = new SubmitCode("for kernel 2", kernel2.Name);

// await scheduler.Schedule(command1);
// await scheduler.Schedule(command2);

// commandsHandledOnKernel1.Should().ContainSingle().Which.Should().Be(command1);
// commandsHandledOnKernel2.Should().ContainSingle().Which.Should().Be(command2);
//}

//[Fact]
//public async Task scheduling_a_command_will_defer_deferred_commands_scheduled_on_same_kernel()
//{
// var commandsHandledOnKernel1 = new List<KernelCommand>();

// var scheduler = new KernelCommandScheduler();

// var kernel1 = new FakeKernel("kernel1")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel1.Add(command);
// return Task.CompletedTask;
// }
// };
// var kernel2 = new FakeKernel("kernel2")
// {
// Handle = (_, _) => Task.CompletedTask
// };

// var deferredCommand1 = new SubmitCode("deferred for kernel 1", kernel1.Name);
// var deferredCommand2 = new SubmitCode("deferred for kernel 2", kernel2.Name);
// var deferredCommand3 = new SubmitCode("deferred for kernel 1", kernel1.Name);
// var command1 = new SubmitCode("for kernel 1", kernel1.Name);

// scheduler.DeferCommand(deferredCommand1);
// scheduler.DeferCommand(deferredCommand2);
// scheduler.DeferCommand(deferredCommand3);
// await scheduler.Schedule(command1);

// commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
// commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
//}

//[Fact]
//public async Task deferred_command_not_executed_are_still_in_deferred_queue()
//{
// var commandsHandledOnKernel1 = new List<KernelCommand>();
// var commandsHandledOnKernel2 = new List<KernelCommand>();

// var scheduler = new KernelCommandScheduler();

// var kernel1 = new FakeKernel("kernel1")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel1.Add(command);
// return Task.CompletedTask;
// }
// };
// var kernel2 = new FakeKernel("kernel2")
// {
// Handle = (command, _) =>
// {
// commandsHandledOnKernel2.Add(command);
// return Task.CompletedTask;
// }
// };

// var deferredCommand1 = new SubmitCode("deferred for kernel 1");
// var deferredCommand2 = new SubmitCode("deferred for kernel 2");
// var deferredCommand3 = new SubmitCode("deferred for kernel 1");
// var command1 = new SubmitCode("for kernel 1");
// var command2 = new SubmitCode("for kernel 2");

// scheduler.DeferCommand(deferredCommand1, kernel1);
// scheduler.DeferCommand(deferredCommand2, kernel2);
// scheduler.DeferCommand(deferredCommand3, kernel1);
// await scheduler.Schedule(command1, kernel1);

// commandsHandledOnKernel2.Should().BeEmpty();
// commandsHandledOnKernel1.Should().NotContain(deferredCommand2);
// commandsHandledOnKernel1.Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand3, command1);
// await scheduler.Schedule(command2, kernel2);
// commandsHandledOnKernel2.Should().BeEquivalentSequenceTo(deferredCommand2, command2);
//}

//[Fact]
//public async Task deferred_command_on_parent_kernel_are_executed_when_scheduling_command_on_child_kernel()
//{
// var commandHandledList = new List<(KernelCommand command, Kernel kernel)>();

// var scheduler = new KernelCommandScheduler();

// var childKernel = new FakeKernel("kernel1")
// {
// Handle = (command, context) => command.InvokeAsync(context)
// };
// var parentKernel = new CompositeKernel
// {
// childKernel
// };

// parentKernel.DefaultKernelName = childKernel.Name;

// var deferredCommand1 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, childKernel.Name);
// var deferredCommand2 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, parentKernel.Name);
// var deferredCommand3 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, childKernel.Name);
// var command1 = new TestCommand((command, context) =>
// {
// commandHandledList.Add((command, context.HandlingKernel));
// return Task.CompletedTask;
// }, childKernel.Name);

// scheduler.DeferCommand(deferredCommand1, childKernel);
// scheduler.DeferCommand(deferredCommand2, parentKernel);
// scheduler.DeferCommand(deferredCommand3, childKernel);
// await scheduler.Schedule(command1, childKernel);

// commandHandledList.Select(e => e.command).Should().BeEquivalentSequenceTo(deferredCommand1, deferredCommand2, deferredCommand3, command1);

// commandHandledList.Select(e => e.kernel).Should().BeEquivalentSequenceTo(childKernel, parentKernel, childKernel, childKernel);
//}
}
}
}
Loading

0 comments on commit ab93a75

Please sign in to comment.