Skip to content

Commit

Permalink
Merge pull request #153 from microsoft/vpl/batch-tests
Browse files Browse the repository at this point in the history
Vpl/batch tests
  • Loading branch information
vplauzon authored Oct 25, 2023
2 parents ed068ff + 98b3551 commit 266f5f4
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 248 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/exec-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ jobs:
echo "deltaKustoTenantId=${{ secrets.test_tenant_id }}" >> $GITHUB_ENV
echo "deltaKustoSpId=${{ secrets.deploy_sp_id }}" >> $GITHUB_ENV
echo "deltaKustoSpSecret=${{ secrets.deploy_sp_secret }}" >> $GITHUB_ENV
echo "maxDbs=150" >> $GITHUB_ENV
# See https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-test
- name: File-based integration tests (out-of-proc)
shell: bash
Expand All @@ -121,6 +122,7 @@ jobs:
mac:
needs:
- clusterSetup
- linux

# This can run in parallel with the other job as they target different databases
runs-on: macos-latest
Expand Down Expand Up @@ -186,6 +188,7 @@ jobs:
echo "deltaKustoTenantId=${{ secrets.test_tenant_id }}" >> $GITHUB_ENV
echo "deltaKustoSpId=${{ secrets.deploy_sp_id }}" >> $GITHUB_ENV
echo "deltaKustoSpSecret=${{ secrets.deploy_sp_secret }}" >> $GITHUB_ENV
echo "maxDbs=150" >> $GITHUB_ENV
# See https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-test
- name: File-based integration tests (out-of-proc)
shell: bash
Expand All @@ -202,6 +205,7 @@ jobs:
windows:
needs:
- clusterSetup
- mac

runs-on: windows-latest

Expand Down Expand Up @@ -266,6 +270,7 @@ jobs:
echo "deltaKustoTenantId=${{ secrets.test_tenant_id }}" >> $GITHUB_ENV
echo "deltaKustoSpId=${{ secrets.deploy_sp_id }}" >> $GITHUB_ENV
echo "deltaKustoSpSecret=${{ secrets.deploy_sp_secret }}" >> $GITHUB_ENV
echo "maxDbs=150" >> $GITHUB_ENV
# See https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-test
- name: File-based integration tests (out-of-proc)
shell: bash
Expand Down
95 changes: 54 additions & 41 deletions code/DeltaKustoAdxIntegrationTest/AdxDbTestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using DeltaKustoLib.CommandModel;
using DeltaKustoLib.KustoModel;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
Expand All @@ -18,19 +17,22 @@ public class AdxDbTestHelper : IDisposable
{
private readonly string _dbPrefix;
private readonly Func<string, IKustoManagementGateway> _kustoManagementGatewayFactory;
private readonly ConcurrentQueue<Task<string>> _preparingDbs =
new();
private volatile int _dbCount = 0;
private readonly int _maxDbs;
private readonly List<string> _availableDbNames;
private readonly object _queueLock = new();
private volatile TaskCompletionSource _newDbEvent = new TaskCompletionSource();

public static AdxDbTestHelper Instance { get; } = CreateSingleton();

#region Constructors
private static AdxDbTestHelper CreateSingleton()
{
var dbPrefix = Environment.GetEnvironmentVariable("deltaKustoDbPrefix");
var clusterUri = Environment.GetEnvironmentVariable("deltaKustoClusterUri");
var tenantId = Environment.GetEnvironmentVariable("deltaKustoTenantId");
var servicePrincipalId = Environment.GetEnvironmentVariable("deltaKustoSpId");
var servicePrincipalSecret = Environment.GetEnvironmentVariable("deltaKustoSpSecret");
var maxDbsText = Environment.GetEnvironmentVariable("maxDbs");

if (string.IsNullOrWhiteSpace(tenantId))
{
Expand All @@ -52,7 +54,12 @@ private static AdxDbTestHelper CreateSingleton()
{
throw new ArgumentNullException(nameof(dbPrefix));
}
if (string.IsNullOrWhiteSpace(maxDbsText) || !int.TryParse(maxDbsText, out _))
{
throw new ArgumentNullException(nameof(maxDbsText));
}

var maxDbs = int.Parse(maxDbsText);
var tracer = new ConsoleTracer(false);
var httpClientFactory = new SimpleHttpClientFactory(tracer);
var tokenParameterization = new TokenProviderParameterization
Expand All @@ -70,62 +77,54 @@ private static AdxDbTestHelper CreateSingleton()
null);
var helper = new AdxDbTestHelper(
dbPrefix,
db => kustoGatewayFactory.CreateGateway(new Uri(clusterUri), db));
db => kustoGatewayFactory.CreateGateway(new Uri(clusterUri), db),
maxDbs);

return helper;
}

private AdxDbTestHelper(
string dbPrefix,
Func<string, IKustoManagementGateway> kustoManagementGatewayFactory)
Func<string, IKustoManagementGateway> kustoManagementGatewayFactory,
int maxDbs)
{
_dbPrefix = dbPrefix;
_kustoManagementGatewayFactory = kustoManagementGatewayFactory;
_maxDbs = maxDbs;
_availableDbNames = Enumerable.Range(1, _maxDbs)
.Select(i => DbNumberToDbName(i))
.ToList();
}
#endregion

public async Task<DbNameHolder> GetCleanDbAsync()
public async Task<IImmutableList<string>> GetDbsAsync(int dbCount)
{
if (_preparingDbs.TryDequeue(out Task<string>? preparingDbTask))
while (true)
{
var dbName = await preparingDbTask;

return new DbNameHolder(dbName, () => ReleaseDb(dbName));
}
else
{ // No more database available in queue, let's take the next one
var dbNumber = Interlocked.Increment(ref _dbCount);
var dbName = DbNumberToDbName(dbNumber);

await CleanDbAsync(dbName);

return new DbNameHolder(dbName, () => ReleaseDb(dbName));
}
}
var newDbEvent = _newDbEvent;

void IDisposable.Dispose()
{
// First clean up the queue
Task.WhenAll(_preparingDbs.ToArray()).Wait();
}

private void ReleaseDb(string dbName)
{
if (!dbName.StartsWith(_dbPrefix))
{
throw new ArgumentException("Wrong prefix", nameof(dbName));
}
lock (_queueLock)
{
if (_availableDbNames.Count >= dbCount)
{
var dbNames = _availableDbNames.Take(dbCount).ToImmutableArray();

async Task<string> prepereDbAsync()
{
await CleanDbAsync(dbName);
_availableDbNames.RemoveRange(0, dbCount);

return dbName;
return dbNames;
}
}
// Not enough db available: let's wait for some!
await newDbEvent.Task;
// Change the event for a fresh one
Interlocked.CompareExchange(
ref _newDbEvent,
new TaskCompletionSource(),
newDbEvent);
}

_preparingDbs.Enqueue(prepereDbAsync());
}

private async Task CleanDbAsync(string dbName)
public async Task CleanDbAsync(string dbName)
{
var kustoGateway = _kustoManagementGatewayFactory(dbName);
var currentCommands = await kustoGateway.ReverseEngineerDatabaseAsync();
Expand All @@ -137,6 +136,20 @@ private async Task CleanDbAsync(string dbName)
await kustoGateway.ExecuteCommandsAsync(cleanCommands);
}

public void ReleaseDbs(IEnumerable<string> dbNames)
{
lock (_queueLock)
{
_availableDbNames.AddRange(dbNames);
}
// Pop event for waiting threads
_newDbEvent.TrySetResult();
}

void IDisposable.Dispose()
{
}

private string DbNumberToDbName(int c)
{
return $"{_dbPrefix}{c:D8}";
Expand Down
Loading

0 comments on commit 266f5f4

Please sign in to comment.