Skip to content

Commit

Permalink
Refactor cancel-tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
vplauzon committed May 12, 2021
1 parent cacd644 commit 84ac96d
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 118 deletions.
74 changes: 25 additions & 49 deletions code/DeltaKustoAdxIntegrationTest/AdxIntegrationTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ namespace DeltaKustoAdxIntegrationTest
{
public abstract class AdxIntegrationTestBase : IntegrationTestBase
{
private static readonly TimeSpan TIME_OUT = TimeSpan.FromSeconds(90);

private readonly Uri _clusterUri;
private readonly string _currentDb;
private readonly string _targetDb;
Expand Down Expand Up @@ -84,15 +82,11 @@ protected AdxIntegrationTestBase()

protected async Task TestAdxToFile(string statesFolderPath, string outputFolder)
{
var cancellationToken = new CancellationTokenSource(TIME_OUT);
var ct = cancellationToken.Token;

await LoopThroughStateFilesAsync(
statesFolderPath,
ct,
async (fromFile, toFile) =>
{
await PrepareDbAsync(fromFile, true, ct);
await PrepareDbAsync(fromFile, true);

var outputPath = outputFolder
+ Path.GetFileNameWithoutExtension(fromFile)
Expand All @@ -104,15 +98,14 @@ await LoopThroughStateFilesAsync(
.Append(("jobs.main.action.filePath", outputPath));
var parameters = await RunParametersAsync(
"adx-to-file-params.json",
ct,
overrides);
var outputCommands = await LoadScriptAsync("", outputPath);
var targetCommands = CommandBase.FromScript(
await File.ReadAllTextAsync(toFile));

await ApplyCommandsAsync(outputCommands, true, ct);
await ApplyCommandsAsync(outputCommands, true);

var finalCommands = await FetchDbCommandsAsync(true, ct);
var finalCommands = await FetchDbCommandsAsync(true);
var targetModel = DatabaseModel.FromCommands(targetCommands);
var finalModel = DatabaseModel.FromCommands(finalCommands);
var finalScript = string.Join(";\n\n", finalCommands.Select(c => c.ToScript()));
Expand All @@ -126,15 +119,11 @@ await LoopThroughStateFilesAsync(

protected async Task TestFileToAdx(string statesFolderPath, string outputFolder)
{
var cancellationToken = new CancellationTokenSource(TIME_OUT);
var ct = cancellationToken.Token;

await LoopThroughStateFilesAsync(
statesFolderPath,
ct,
async (fromFile, toFile) =>
{
await PrepareDbAsync(toFile, false, ct);
await PrepareDbAsync(toFile, false);

var outputPath = outputFolder
+ Path.GetFileNameWithoutExtension(fromFile)
Expand All @@ -146,7 +135,6 @@ await LoopThroughStateFilesAsync(
.Append(("jobs.main.action.filePath", outputPath));
var parameters = await RunParametersAsync(
"file-to-adx-params.json",
ct,
overrides);
var outputCommands = await LoadScriptAsync("", outputPath);
var currentCommands = CommandBase.FromScript(
Expand All @@ -156,10 +144,9 @@ await LoopThroughStateFilesAsync(

await ApplyCommandsAsync(
currentCommands.Concat(outputCommands),
true,
ct);
true);

var finalCommands = await FetchDbCommandsAsync(true, ct);
var finalCommands = await FetchDbCommandsAsync(true);
var targetModel = DatabaseModel.FromCommands(targetCommands);
var finalModel = DatabaseModel.FromCommands(finalCommands);
var finalScript = string.Join(";\n\n", finalCommands.Select(c => c.ToScript()));
Expand All @@ -173,17 +160,13 @@ await ApplyCommandsAsync(

protected async Task TestAdxToAdx(string statesFolderPath, string outputFolder)
{
var cancellationToken = new CancellationTokenSource(TIME_OUT);
var ct = cancellationToken.Token;

await LoopThroughStateFilesAsync(
statesFolderPath,
ct,
async (fromFile, toFile) =>
{
await Task.WhenAll(
PrepareDbAsync(fromFile, true, ct),
PrepareDbAsync(toFile, false, ct));
PrepareDbAsync(fromFile, true),
PrepareDbAsync(toFile, false));

var outputPath = outputFolder
+ Path.GetFileNameWithoutExtension(fromFile)
Expand All @@ -195,11 +178,10 @@ await Task.WhenAll(
.Append(("jobs.main.action.filePath", outputPath));
var parameters = await RunParametersAsync(
"adx-to-adx-params.json",
ct,
overrides);
var targetCommands = CommandBase.FromScript(
await File.ReadAllTextAsync(toFile));
var finalCommands = await FetchDbCommandsAsync(false, ct);
var finalCommands = await FetchDbCommandsAsync(false);
var targetModel = DatabaseModel.FromCommands(targetCommands);
var finalModel = DatabaseModel.FromCommands(finalCommands);
var finalScript = string.Join(";\n\n", finalCommands.Select(c => c.ToScript()));
Expand All @@ -213,7 +195,6 @@ await Task.WhenAll(

private async Task LoopThroughStateFilesAsync(
string folderPath,
CancellationToken ct,
Func<string, string, Task> loopFunction)
{
var stateFiles = Directory.GetFiles(folderPath);
Expand All @@ -225,34 +206,31 @@ private async Task LoopThroughStateFilesAsync(
foreach (var toFile in stateFiles)
{
Console.WriteLine($"Current loop: ({fromFile}, {toFile})");
await CleanDatabasesAsync(ct);
await CleanDatabasesAsync();
await loopFunction(fromFile, toFile);
}
}
}

private async Task ApplyCommandsAsync(
IEnumerable<CommandBase> commands,
bool isCurrent,
CancellationToken ct)
bool isCurrent)
{
var gateway = CreateKustoManagementGateway(isCurrent);

// Apply commands to the db
await gateway.ExecuteCommandsAsync(commands, ct);
await gateway.ExecuteCommandsAsync(commands);
}

private async Task<IImmutableList<CommandBase>> FetchDbCommandsAsync(
bool isCurrent,
CancellationToken ct)
private async Task<IImmutableList<CommandBase>> FetchDbCommandsAsync(bool isCurrent)
{
var gateway = CreateKustoManagementGateway(isCurrent);
var dbProvider = (IDatabaseProvider)new KustoDatabaseProvider(
new ConsoleTracer(false),
gateway);
var emptyProvider = (IDatabaseProvider)new EmptyDatabaseProvider();
var finalDb = await dbProvider.RetrieveDatabaseAsync(ct);
var emptyDb = await emptyProvider.RetrieveDatabaseAsync(ct);
var finalDb = await dbProvider.RetrieveDatabaseAsync();
var emptyDb = await emptyProvider.RetrieveDatabaseAsync();
// Use the delta from an empty db to get
var finalCommands = emptyDb.ComputeDelta(finalDb);

Expand All @@ -261,7 +239,6 @@ private async Task<IImmutableList<CommandBase>> FetchDbCommandsAsync(

protected override Task<MainParameterization> RunParametersAsync(
string parameterFilePath,
CancellationToken ct,
IEnumerable<(string path, string value)>? overrides = null)
{
var adjustedOverrides = overrides != null
Expand All @@ -273,20 +250,19 @@ protected override Task<MainParameterization> RunParametersAsync(
.Append(("tokenProvider.login.clientId", _servicePrincipalId))
.Append(("tokenProvider.login.secret", _servicePrincipalSecret));

return base.RunParametersAsync(parameterFilePath, ct, adjustedOverrides);
return base.RunParametersAsync(parameterFilePath, adjustedOverrides);
}

protected async Task CleanDatabasesAsync(CancellationToken ct)
protected async Task CleanDatabasesAsync()
{
await Task.WhenAll(
CleanDbAsync(true, ct),
CleanDbAsync(false, ct));
CleanDbAsync(true),
CleanDbAsync(false));
}

protected async Task PrepareDbAsync(
string scriptPath,
bool isCurrent,
CancellationToken ct)
bool isCurrent)
{
var script = await File.ReadAllTextAsync(scriptPath);

Expand All @@ -295,7 +271,7 @@ protected async Task PrepareDbAsync(
var commands = CommandBase.FromScript(script);
var gateway = CreateKustoManagementGateway(isCurrent);

await gateway.ExecuteCommandsAsync(commands, ct);
await gateway.ExecuteCommandsAsync(commands);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -333,18 +309,18 @@ private ITokenProvider CreateTokenProvider()
return tokenProvider!;
}

private async Task CleanDbAsync(bool isCurrent, CancellationToken ct)
private async Task CleanDbAsync(bool isCurrent)
{
var emptyDbProvider = (IDatabaseProvider)new EmptyDatabaseProvider();
var kustoGateway = CreateKustoManagementGateway(isCurrent);
var dbProvider = (IDatabaseProvider)new KustoDatabaseProvider(
new ConsoleTracer(false),
kustoGateway);
var emptyDb = await emptyDbProvider.RetrieveDatabaseAsync(ct);
var db = await dbProvider.RetrieveDatabaseAsync(ct);
var emptyDb = await emptyDbProvider.RetrieveDatabaseAsync();
var db = await dbProvider.RetrieveDatabaseAsync();
var currentDeltaCommands = db.ComputeDelta(emptyDb);

await kustoGateway.ExecuteCommandsAsync(currentDeltaCommands, ct);
await kustoGateway.ExecuteCommandsAsync(currentDeltaCommands);
}
}
}
17 changes: 6 additions & 11 deletions code/DeltaKustoAdxIntegrationTest/FailIfDataLossTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ public class FailIfDataLossTest : AdxIntegrationTestBase
public async Task TestFailIfDropsNoDrop()
{
var toFile = "FailIfDataLoss/target.kql";
var ct = CreateCancellationToken();

await CleanDatabasesAsync(ct);
await PrepareDbAsync(toFile, false, ct);
await CleanDatabasesAsync();
await PrepareDbAsync(toFile, false);

await RunParametersAsync("FailIfDataLoss/no-fail.json", ct, TargetDbOverrides);
await RunParametersAsync("FailIfDataLoss/no-fail.json", TargetDbOverrides);

// We just test that this doesn't fail
}
Expand All @@ -32,10 +31,9 @@ public async Task TestFailIfDropsNoDrop()
public async Task TestFailIfDrops()
{
var toFile = "FailIfDataLoss/target.kql";
var ct = CreateCancellationToken();

await CleanDatabasesAsync(ct);
await PrepareDbAsync(toFile, false, ct);
await CleanDatabasesAsync();
await PrepareDbAsync(toFile, false);

var overrides = TargetDbOverrides
.Append(("failIfDrops", "true"));
Expand All @@ -44,7 +42,7 @@ public async Task TestFailIfDrops()
{
// The "Main" will return non-zero which will throw an exception
var parameters =
await RunParametersAsync("FailIfDataLoss/no-fail.json", ct, overrides);
await RunParametersAsync("FailIfDataLoss/no-fail.json", overrides);

Assert.True(parameters.FailIfDataLoss);
Assert.False(true, "Should have thrown by now");
Expand All @@ -53,8 +51,5 @@ public async Task TestFailIfDrops()
{
}
}

private CancellationToken CreateCancellationToken() =>
new CancellationTokenSource(TimeSpan.FromSeconds(8)).Token;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class FunctionEmptyCurrentTest : IntegrationTestBase
public async Task EmptyDelta()
{
var paramPath = "Functions/EmptyCurrent/EmptyDelta/empty-delta-params.yaml";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var commands = await LoadScriptAsync(paramPath, outputPath);

Expand All @@ -23,7 +23,7 @@ public async Task EmptyDelta()
public async Task OneFunctionDelta()
{
var paramPath = "Functions/EmptyCurrent/OneFunctionDelta/delta-params.yaml";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var inputPath = parameters.Jobs!.First().Value.Target!.Scripts!.First().FilePath!;
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var inputCommands = await LoadScriptAsync(paramPath, inputPath);
Expand All @@ -36,16 +36,13 @@ public async Task OneFunctionDelta()
public async Task TwoFunctionsDelta()
{
var paramPath = "Functions/EmptyCurrent/TwoFunctionsDelta/delta-params.yaml";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var inputPath = parameters.Jobs!.First().Value.Target!.Scripts!.First().FilePath!;
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var inputCommands = await LoadScriptAsync(paramPath, inputPath);
var outputCommands = await LoadScriptAsync(paramPath, outputPath);

Assert.True(inputCommands.SequenceEqual(outputCommands));
}

private CancellationToken CreateCancellationToken() =>
new CancellationTokenSource(TimeSpan.FromSeconds(2)).Token;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class FunctionWithCurrentTest : IntegrationTestBase
public async Task EmptyBoth()
{
var paramPath = "Functions/WithCurrent/EmptyBoth/delta-params.json";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var commands = await LoadScriptAsync(paramPath, outputPath);

Expand All @@ -25,7 +25,7 @@ public async Task EmptyBoth()
public async Task Same()
{
var paramPath = "Functions/WithCurrent/Same/delta-params.json";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var commands = await LoadScriptAsync(paramPath, outputPath);

Expand All @@ -36,7 +36,7 @@ public async Task Same()
public async Task TargetMore()
{
var paramPath = "Functions/WithCurrent/TargetMore/delta-params.json";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var commands = await LoadScriptAsync(paramPath, outputPath);

Expand All @@ -51,7 +51,7 @@ public async Task TargetMore()
public async Task TargetLess()
{
var paramPath = "Functions/WithCurrent/TargetLess/delta-params.json";
var parameters = await RunParametersAsync(paramPath, CreateCancellationToken());
var parameters = await RunParametersAsync(paramPath);
var outputRootFolder = parameters.Jobs!.First().Value.Action!.FolderPath!;
var outputPath = Path.Combine(outputRootFolder, "functions/drop/YourFunction.kql");
var commands = await LoadScriptAsync(paramPath, outputPath);
Expand All @@ -62,8 +62,5 @@ public async Task TargetLess()

Assert.Equal("YourFunction", function.FunctionName.Name);
}

private CancellationToken CreateCancellationToken() =>
new CancellationTokenSource(TimeSpan.FromSeconds(2)).Token;
}
}
Loading

0 comments on commit 84ac96d

Please sign in to comment.