Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for orchestrator termination #3023

Merged
merged 9 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions test/e2e/Apps/BasicDotNetIsolated/TerminateOrchestration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Net;
using Grpc.Core;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Durable.Tests.E2E
{
public static class LongRunningOrchestration
{
[Function(nameof(LongRunningOrchestrator))]
public static async Task<List<string>> LongRunningOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(HelloCities));
logger.LogInformation("Starting long-running orchestration.");
var outputs = new List<string>();

// Call our fake activity 100,000 times to simulate an orchestration that might run for >= 10,000s (2.7 hours)
for (int i = 0; i < 100000; i++)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that a large fan-out like this may make the test unstable on the low-powered CI machines due to the memory and I/O requirements. Do we need a fan-out at all or could we just use an orchestration that sleeps for a long time or waits for an external event?

Copy link
Contributor Author

@andystaples andystaples Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a fan-out though, the activities are called sequentially. I agree, it's overkill though since most of the tests run in well under a minute, I can shorten it, the idea behind writing the orchestrator this way was to demonstrate what might cause an orchestration with problematic runtime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I misread the code. OK, I'm less worried about this then.

{
outputs.Add(await context.CallActivityAsync<string>(nameof(SimulatedWorkActivity), 100));
}

return outputs;
}

[Function(nameof(SimulatedWorkActivity))]
public static string SimulatedWorkActivity([ActivityTrigger]int sleepMs, FunctionContext executionContext)
{
// Sleep the provided number of ms to simulate a long-running activity operation
ILogger logger = executionContext.GetLogger("SimulatedWorkActivity");
logger.LogInformation("Sleeping for {sleepMs}ms.", sleepMs);
Thread.Sleep(sleepMs);
return $"Slept for {sleepMs}ms.";
}

[Function("LongOrchestrator_HttpStart")]
public static async Task<HttpResponseData> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("LongOrchestrator_HttpStart");

string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(LongRunningOrchestrator));

logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);

return await client.CreateCheckStatusResponseAsync(req, instanceId);
}

[Function("TerminateInstance")]
public static async Task<HttpResponseData> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
string instanceId)
{
string reason = "Long-running orchestration was terminated early.";
try
{
await client.TerminateInstanceAsync(instanceId, reason);
return req.CreateResponse(HttpStatusCode.OK);
}
catch (RpcException ex)
{
var response = req.CreateResponse(HttpStatusCode.BadRequest);
response.Headers.Add("Content-Type", "text/plain");
await response.WriteStringAsync(ex.Message);
return response;
}
}
}
}
37 changes: 24 additions & 13 deletions test/e2e/Tests/Helpers/DurableHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,40 @@ public OrchestrationStatusDetails(string statusQueryResponse)
}
}

internal static string ParseStatusQueryGetUri(HttpResponseMessage invocationStartResponse)
internal static async Task<string> ParseStatusQueryGetUriAsync(HttpResponseMessage invocationStartResponse)
{
string? responseString = invocationStartResponse.Content?.ReadAsStringAsync().Result;
string? responseString = await invocationStartResponse.Content.ReadAsStringAsync();
return TokenizeAndGetValueFromKeyAsString(responseString, "StatusQueryGetUri");
}

internal static async Task<string> ParseInstanceIdAsync(HttpResponseMessage invocationStartResponse)
{
string? responseString = await invocationStartResponse.Content.ReadAsStringAsync();
return TokenizeAndGetValueFromKeyAsString(responseString, "Id");
}

internal static async Task<OrchestrationStatusDetails> GetRunningOrchestrationDetailsAsync(string statusQueryGetUri)
{
var statusQueryResponse = await _httpClient.GetAsync(statusQueryGetUri);

string? statusQueryResponseString = await statusQueryResponse.Content.ReadAsStringAsync();

return new OrchestrationStatusDetails(statusQueryResponseString);
}

if (string.IsNullOrEmpty(responseString))
private static string TokenizeAndGetValueFromKeyAsString(string? json, string key)
{
if (string.IsNullOrEmpty(json))
{
return string.Empty;
}
JsonNode? responseJsonNode = JsonNode.Parse(responseString);
JsonNode? responseJsonNode = JsonNode.Parse(json);
if (responseJsonNode == null)
{
return string.Empty;
}

string? statusQueryGetUri = responseJsonNode["StatusQueryGetUri"]?.GetValue<string>();
string? statusQueryGetUri = responseJsonNode[key]?.GetValue<string>();
return statusQueryGetUri ?? string.Empty;
}
internal static OrchestrationStatusDetails GetRunningOrchestrationDetails(string statusQueryGetUri)
{
var statusQueryResponse = _httpClient.GetAsync(statusQueryGetUri);

string? statusQueryResponseString = statusQueryResponse.Result.Content.ReadAsStringAsync().Result;

return new OrchestrationStatusDetails(statusQueryResponseString);
}
}
14 changes: 7 additions & 7 deletions test/e2e/Tests/Tests/HelloCitiesTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public async Task HttpTriggerTests(string functionName, HttpStatusCode expectedS
string actualMessage = await response.Content.ReadAsStringAsync();

Assert.Equal(expectedStatusCode, response.StatusCode);
string statusQueryGetUri = DurableHelpers.ParseStatusQueryGetUri(response);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);
Thread.Sleep(1000);
var orchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Completed", orchestrationDetails.RuntimeStatus);
Assert.Contains(partialExpectedOutput, orchestrationDetails.Output);
}
Expand All @@ -61,28 +61,28 @@ public async Task ScheduledStartTests(string functionName, int startDelaySeconds
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, urlQueryString);
string actualMessage = await response.Content.ReadAsStringAsync();

string statusQueryGetUri = DurableHelpers.ParseStatusQueryGetUri(response);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

Assert.Equal(expectedStatusCode, response.StatusCode);

var orchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
while (DateTime.UtcNow < scheduledStartTime + TimeSpan.FromSeconds(-1))
{
WriteOutput($"Test scheduled for {scheduledStartTime}, current time {DateTime.Now}");
orchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Pending", orchestrationDetails.RuntimeStatus);
Thread.Sleep(1000);
}

// Give a small amount of time for the orchestration to complete, even if scheduled to run immediately
Thread.Sleep(3000);
WriteOutput($"Test scheduled for {scheduledStartTime}, current time {DateTime.Now}, looking for completed");
var finalOrchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
var finalOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
int retryAttempts = 0;
while (finalOrchestrationDetails.RuntimeStatus != "Completed" && retryAttempts < 10)
{
Thread.Sleep(1000);
finalOrchestrationDetails = DurableHelpers.GetRunningOrchestrationDetails(statusQueryGetUri);
finalOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
retryAttempts++;
}
Assert.Equal("Completed", finalOrchestrationDetails.RuntimeStatus);
Expand Down
154 changes: 154 additions & 0 deletions test/e2e/Tests/Tests/TerminateOrchestratorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Net;
using Xunit;
using Xunit.Abstractions;

namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E;

[Collection(Constants.FunctionAppCollectionName)]
public class TerminateOrchestratorTests
{
private readonly FunctionAppFixture _fixture;
private readonly ITestOutputHelper _output;

public TerminateOrchestratorTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper)
{
_fixture = fixture;
_fixture.TestLogs.UseTestLogger(testOutputHelper);
_output = testOutputHelper;
}


[Fact]
public async Task TerminateRunningOrchestration_ShouldSucceed()
{
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("LongOrchestrator_HttpStart", "");

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string instanceId = await DurableHelpers.ParseInstanceIdAsync(response);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

Thread.Sleep(1000);

var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Running", orchestrationDetails.RuntimeStatus);

using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}");
await AssertTerminateRequestSucceedsAsync(terminateResponse);

Thread.Sleep(1000);

orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Terminated", orchestrationDetails.RuntimeStatus);
}


[Fact(Skip = "Will enable when https://github.com/Azure/azure-functions-durable-extension/issues/3025 is fixed")]
public async Task TerminateScheduledOrchestration_ShouldSucceed()
{
DateTime scheduledStartTime = DateTime.UtcNow + TimeSpan.FromMinutes(1);
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("HelloCities_HttpStart_Scheduled", $"?scheduledStartTime={scheduledStartTime.ToString("o")}");

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string instanceId = await DurableHelpers.ParseInstanceIdAsync(response);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

Thread.Sleep(1000);

var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Pending", orchestrationDetails.RuntimeStatus);

using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}");
await AssertTerminateRequestSucceedsAsync(terminateResponse);

Thread.Sleep(1000);

orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Terminated", orchestrationDetails.RuntimeStatus);
}


[Fact]
public async Task TerminateTerminatedOrchestration_ShouldFail()
{
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("LongOrchestrator_HttpStart", "");

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string instanceId = await DurableHelpers.ParseInstanceIdAsync(response);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

Thread.Sleep(1000);

var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Running", orchestrationDetails.RuntimeStatus);

using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}");
await AssertTerminateRequestSucceedsAsync(terminateResponse);

Thread.Sleep(1000);
using HttpResponseMessage terminateAgainResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}");
await AssertTerminateRequestFailsAsync(terminateAgainResponse);

// Give some time for Core Tools to write logs out
Thread.Sleep(500);

Assert.Contains(_fixture.TestLogs.CoreToolsLogs, x => x.Contains("Cannot terminate orchestration instance in the Terminated state.") &&
x.Contains(instanceId));

orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Terminated", orchestrationDetails.RuntimeStatus);
}


[Fact]
public async Task TerminateCompletedOrchestration_ShouldFail()
{
using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("HelloCities_HttpStart", "");

Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
string instanceId = await DurableHelpers.ParseInstanceIdAsync(response);
string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response);

Thread.Sleep(1000);

var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri);
Assert.Equal("Completed", orchestrationDetails.RuntimeStatus);

using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}");
await AssertTerminateRequestFailsAsync(terminateResponse);

// Give some time for Core Tools to write logs out
Thread.Sleep(500);

Assert.Contains(_fixture.TestLogs.CoreToolsLogs, x => x.Contains("Cannot terminate orchestration instance in the Completed state.") &&
x.Contains(instanceId));
}

[Fact]
public async Task TerminateNonExistantOrchestration_ShouldFail()
{
using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={Guid.NewGuid().ToString()}");
await AssertTerminateRequestFailsAsync(terminateResponse);
}

private static async Task AssertTerminateRequestFailsAsync(HttpResponseMessage terminateResponse)
{
Assert.Equal(HttpStatusCode.BadRequest, terminateResponse.StatusCode);

string? terminateResponseMessage = await terminateResponse.Content.ReadAsStringAsync();
Assert.NotNull(terminateResponseMessage);
// Unclear error message - see https://github.com/Azure/azure-functions-durable-extension/issues/3027, will update this code when that bug is fixed
Assert.Equal("Status(StatusCode=\"Unknown\", Detail=\"Exception was thrown by handler.\")", terminateResponseMessage);
andystaples marked this conversation as resolved.
Show resolved Hide resolved
}

private static async Task AssertTerminateRequestSucceedsAsync(HttpResponseMessage terminateResponse)
{
Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode);

string? terminateResponseMessage = await terminateResponse.Content.ReadAsStringAsync();
Assert.NotNull(terminateResponseMessage);
Assert.Empty(terminateResponseMessage);
}
}
Loading