Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Adding control-queue monitor with dedicated orchestrator to each control-queue. #1058

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
304 changes: 304 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/ControlQueueHelperTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage.Tests
{
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlTypes;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.ControlQueueHeartbeat;
using DurableTask.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class ControlQueueHelperTests
{
private IControlQueueHelper controlQueueHelper;
private AzureStorageOrchestrationService azureStorageOrchestrationService;
private AzureStorageOrchestrationServiceSettings settings;
private int partitionCount = 4;
private Dictionary<string, int> controlQueueNumberToNameMap;
private CancellationTokenSource cancellationTokenSource;

[TestInitialize]
public void Initialize()
{
cancellationTokenSource = new CancellationTokenSource();

settings = new AzureStorageOrchestrationServiceSettings()
{
StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(),
TaskHubName = TestHelpers.GetTestTaskHubName(),
PartitionCount = partitionCount,
ControlQueueHearbeatOrchestrationInterval = TimeSpan.FromSeconds(5),
ControlQueueOrchHeartbeatDetectionInterval = TimeSpan.FromSeconds(5),
ControlQueueOrchHeartbeatLatencyThreshold = TimeSpan.FromSeconds(5),
};

azureStorageOrchestrationService = new AzureStorageOrchestrationService(settings);
controlQueueHelper = azureStorageOrchestrationService;

controlQueueNumberToNameMap = new Dictionary<string, int>();

for (int i = 0; i < partitionCount; i++)
{
var controlQueueName = AzureStorageOrchestrationService.GetControlQueueName(settings.TaskHubName, i);
controlQueueNumberToNameMap[controlQueueName] = i;
}
}

[TestMethod]
public async Task StartControlQueueHeartbeatMonitorAsync()
{
var detectionCount = new Dictionary<string, int>();

var taskHubClient = new TaskHubClient(azureStorageOrchestrationService);
var taskHubWorker = new Core.TaskHubWorker(azureStorageOrchestrationService);
var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo();

// Creating dictionary to calculate stuck issue detection count for each control-queue.
foreach (var controlQueueName in controlQueueToInstanceInfo.Keys)
{
detectionCount[controlQueueName] = 0;
}

var cancellationTokenSrc = new CancellationTokenSource();

await controlQueueHelper.StartControlQueueHeartbeatMonitorAsync(
taskHubClient,
taskHubWorker,
async (x, y, z, cancelationToken) => { await Task.CompletedTask; },
async (workerId, ownerId, isOwner, controlQueueName, instanceid, orchDetectionInfo, cancellationToken) =>
{
detectionCount[controlQueueName]++;
await Task.CompletedTask;
},
cancellationTokenSrc.Token);

// Scheduling of all orchestrator should happen.
foreach (var instanceId in controlQueueToInstanceInfo.Values)
{
var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId);
Assert.IsNotNull(orchIsntance);
}

// Orchestrator registration completed.
var objectCreator = new NameValueObjectCreator<TaskOrchestration>(
ControlQueueHeartbeatTaskOrchestrator.OrchestrationName,
ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion,
typeof(ControlQueueHeartbeatTaskOrchestrator));

Assert.ThrowsException<InvalidOperationException>(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); });

await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold);

var detectionCountDuplicate = new Dictionary<string, int>();

// Should trigger delegate for control-queue stuck.
foreach (var controlQueueName in controlQueueToInstanceInfo.Keys)
{
detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName];
Assert.IsTrue(detectionCount[controlQueueName] > 0);
}


await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold);
cancellationTokenSrc.Cancel();

// Give it some time to cancel the ongoing operations.
await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval);

// Should trigger delegate for control-queue stuck.
foreach (var controlQueueName in controlQueueToInstanceInfo.Keys)
{
Assert.IsFalse(detectionCountDuplicate[controlQueueName] == detectionCount[controlQueueName]);
detectionCountDuplicate[controlQueueName] = detectionCount[controlQueueName];
}

await Task.Delay(this.settings.ControlQueueOrchHeartbeatDetectionInterval + this.settings.ControlQueueOrchHeartbeatLatencyThreshold);

// Should trigger delegate for control-queue stuck.
foreach (var controlQueueName in controlQueueToInstanceInfo.Keys)
{
Assert.IsTrue(detectionCountDuplicate[controlQueueName] == detectionCount[controlQueueName]);
}
}

[TestMethod]
public async Task ScheduleControlQueueHeartbeatOrchestrations()
{
var utcBefore = DateTime.UtcNow;

var taskHubClient = new TaskHubClient(azureStorageOrchestrationService);
await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token, true);

var controlQueueToInstanceInfo = azureStorageOrchestrationService.GetControlQueueToInstanceIdInfo();

var utcNow = DateTime.UtcNow;

foreach (var instanceId in controlQueueToInstanceInfo.Values)
{
var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId);

Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow);
}

await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token, false);

foreach (var instanceId in controlQueueToInstanceInfo.Values)
{
var orchIsntance = await taskHubClient.GetOrchestrationStateAsync(instanceId);

Assert.IsTrue(orchIsntance.CreatedTime >= utcBefore && orchIsntance.CreatedTime <= utcNow);
}
}

[TestMethod]
public void ScheduleControlQueueHeartbeatOrchestrations_InvalidInput()
{
var settingsMod = new AzureStorageOrchestrationServiceSettings()
{
StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(),
TaskHubName = TestHelpers.GetTestTaskHubName(),
PartitionCount = partitionCount + 1,
};

var azureStorageOrchestrationServiceMod = new AzureStorageOrchestrationService(settingsMod);

var taskHubClient = new TaskHubClient(azureStorageOrchestrationServiceMod);

Assert.ThrowsExceptionAsync<ArgumentNullException>(async () =>
{
await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(null, cancellationTokenSource.Token);
});

IOrchestrationServiceClient orchestrationService = new Mock<IOrchestrationServiceClient>().Object;
var taskHubClientDiff = new TaskHubClient(orchestrationService);

Assert.ThrowsExceptionAsync<InvalidOperationException>(async () =>
{
await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClientDiff, cancellationTokenSource.Token);
});

Assert.ThrowsExceptionAsync<InvalidOperationException>(async () =>
{
await controlQueueHelper.ScheduleControlQueueHeartbeatOrchestrationsAsync(taskHubClient, cancellationTokenSource.Token);
});
}

[TestMethod]
public void RegisterControlQueueHeartbeatOrchestration()
{
var taskHubWorker = new Core.TaskHubWorker(azureStorageOrchestrationService);
controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; });

var objectCreator = new NameValueObjectCreator<TaskOrchestration>(
ControlQueueHeartbeatTaskOrchestrator.OrchestrationName,
ControlQueueHeartbeatTaskOrchestrator.OrchestrationVersion,
typeof(ControlQueueHeartbeatTaskOrchestrator));

Assert.ThrowsException<InvalidOperationException>(() => { taskHubWorker.AddTaskOrchestrations(objectCreator); });
}

[TestMethod]
public void RegisterControlQueueHeartbeatOrchestration_InvalidInput()
{
var settingsMod = new AzureStorageOrchestrationServiceSettings()
{
StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(),
TaskHubName = TestHelpers.GetTestTaskHubName(),
PartitionCount = partitionCount + 1,
};

var azureStorageOrchestrationServiceMod = new AzureStorageOrchestrationService(settingsMod);

var taskHubWorker = new TaskHubWorker(azureStorageOrchestrationServiceMod);

Assert.ThrowsException<ArgumentNullException>(() =>
{
controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(null, async (x, y, z, cancellationToken) => { await Task.CompletedTask; });
});

IOrchestrationService orchestrationService = new Mock<IOrchestrationService>().Object;
var taskHubWorkerDiff = new TaskHubWorker(orchestrationService);

Assert.ThrowsException<InvalidOperationException>(() =>
{
controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorkerDiff, async (x, y, z, cancellationToken) => { await Task.CompletedTask; });
});

Assert.ThrowsException<InvalidOperationException>(() =>
{
controlQueueHelper.RegisterControlQueueHeartbeatOrchestration(taskHubWorker, async (x, y, z, cancellationToken) => { await Task.CompletedTask; });
});
}

[TestMethod]
[DataRow(new int[] { 0, 1, 2, 3 })]
[DataRow(new int[] { 2, 3 })]
[DataRow(new int[] { 1, 3 })]
[DataRow(new int[] { 0, 1 })]
[DataRow(new int[] { 0, 2 })]
[DataRow(new int[] { 0, 3 })]
[DataRow(new int[] { 0 })]
[DataRow(new int[] { 1 })]
[DataRow(new int[] { 3 })]
public async Task GetControlQueueInstanceId(int[] controlQueueNumbers)
{
Dictionary<int, List<string>> controlQueueNumberToInstanceIds = new Dictionary<int, List<string>>();

var controlQueueNumbersHashSet = new HashSet<int>();

foreach (var cQN in controlQueueNumbers)
{
controlQueueNumbersHashSet.Add(cQN);
controlQueueNumberToInstanceIds[cQN] = new List<string>();
}


for (int i = 0; i < 100; i++)
{
var instanceId = controlQueueHelper.GetControlQueueInstanceId(controlQueueNumbersHashSet, $"prefix{Guid.NewGuid()}_");

var controlQueue = await azureStorageOrchestrationService.GetControlQueueAsync(instanceId);
var controlQueueNumber = controlQueueNumberToNameMap[controlQueue.Name];

controlQueueNumberToInstanceIds[controlQueueNumber].Add(instanceId);

Assert.IsTrue(controlQueueNumbers.Any(x => x == controlQueueNumber));
}

foreach (var cQN in controlQueueNumbers)
{
Assert.IsTrue(controlQueueNumberToInstanceIds[cQN].Count > 0);
}
}

[TestMethod]
public void GetControlQueueInstanceId_InvalidInput()
{
Assert.ThrowsException<ArgumentNullException>(() => { controlQueueHelper.GetControlQueueInstanceId(null); });
Assert.ThrowsException<ArgumentException>(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet<int>()); });
Assert.ThrowsException<ArgumentException>(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet<int>() { -4 }); });
Assert.ThrowsException<ArgumentException>(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet<int>() { partitionCount }); });
Assert.ThrowsException<ArgumentException>(() => { controlQueueHelper.GetControlQueueInstanceId(new HashSet<int>() { partitionCount + 4 }); });
}

}
}
Loading