Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/microsoft/delta-kusto into …
Browse files Browse the repository at this point in the history
…main
  • Loading branch information
vplauzon committed May 12, 2021
2 parents 53880f7 + 8efe658 commit e2678c8
Show file tree
Hide file tree
Showing 50 changed files with 308 additions and 334 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/continuous-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ jobs:
- name: File-base integration tests (in-proc)
run: |
dotnet test code/DeltaKustoFileIntegrationTest --configuration Release \
--no-build --verbosity normal --blame-hang-timeout 3s
--no-build --verbosity normal --blame-hang-timeout 3s \
-p:ParallelizeTestCollections=false
78 changes: 27 additions & 51 deletions code/DeltaKustoAdxIntegrationTest/AdxIntegrationTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ namespace DeltaKustoAdxIntegrationTest
{
public abstract class AdxIntegrationTestBase : IntegrationTestBase
{
private static readonly TimeSpan TIME_OUT = TimeSpan.FromSeconds(90);

private readonly Uri _clusterUri;
private readonly string _currentDb;
private readonly string _targetDb;
Expand Down Expand Up @@ -84,15 +82,11 @@ protected AdxIntegrationTestBase()

protected async Task TestAdxToFile(string statesFolderPath, string outputFolder)
{
var cancellationToken = new CancellationTokenSource(TIME_OUT);
var ct = cancellationToken.Token;

await LoopThroughStateFilesAsync(
statesFolderPath,
ct,
async (fromFile, toFile) =>
{
await PrepareDbAsync(fromFile, true, ct);
await PrepareDbAsync(fromFile, true);

var outputPath = outputFolder
+ Path.GetFileNameWithoutExtension(fromFile)
Expand All @@ -104,15 +98,14 @@ await LoopThroughStateFilesAsync(
.Append(("jobs.main.action.filePath", outputPath));
var parameters = await RunParametersAsync(
"adx-to-file-params.json",
ct,
overrides);
var outputCommands = await LoadScriptAsync(outputPath);
var outputCommands = await LoadScriptAsync("", outputPath);
var targetCommands = CommandBase.FromScript(
await File.ReadAllTextAsync(toFile));

await ApplyCommandsAsync(outputCommands, true, ct);
await ApplyCommandsAsync(outputCommands, true);

var finalCommands = await FetchDbCommandsAsync(true, ct);
var finalCommands = await FetchDbCommandsAsync(true);
var targetModel = DatabaseModel.FromCommands(targetCommands);
var finalModel = DatabaseModel.FromCommands(finalCommands);
var finalScript = string.Join(";\n\n", finalCommands.Select(c => c.ToScript()));
Expand All @@ -126,15 +119,11 @@ await LoopThroughStateFilesAsync(

protected async Task TestFileToAdx(string statesFolderPath, string outputFolder)
{
var cancellationToken = new CancellationTokenSource(TIME_OUT);
var ct = cancellationToken.Token;

await LoopThroughStateFilesAsync(
statesFolderPath,
ct,
async (fromFile, toFile) =>
{
await PrepareDbAsync(toFile, false, ct);
await PrepareDbAsync(toFile, false);

var outputPath = outputFolder
+ Path.GetFileNameWithoutExtension(fromFile)
Expand All @@ -146,20 +135,18 @@ await LoopThroughStateFilesAsync(
.Append(("jobs.main.action.filePath", outputPath));
var parameters = await RunParametersAsync(
"file-to-adx-params.json",
ct,
overrides);
var outputCommands = await LoadScriptAsync(outputPath);
var outputCommands = await LoadScriptAsync("", outputPath);
var currentCommands = CommandBase.FromScript(
await File.ReadAllTextAsync(fromFile));
var targetCommands = CommandBase.FromScript(
await File.ReadAllTextAsync(toFile));

await ApplyCommandsAsync(
currentCommands.Concat(outputCommands),
true,
ct);
true);

var finalCommands = await FetchDbCommandsAsync(true, ct);
var finalCommands = await FetchDbCommandsAsync(true);
var targetModel = DatabaseModel.FromCommands(targetCommands);
var finalModel = DatabaseModel.FromCommands(finalCommands);
var finalScript = string.Join(";\n\n", finalCommands.Select(c => c.ToScript()));
Expand All @@ -173,17 +160,13 @@ await ApplyCommandsAsync(

protected async Task TestAdxToAdx(string statesFolderPath, string outputFolder)
{
var cancellationToken = new CancellationTokenSource(TIME_OUT);
var ct = cancellationToken.Token;

await LoopThroughStateFilesAsync(
statesFolderPath,
ct,
async (fromFile, toFile) =>
{
await Task.WhenAll(
PrepareDbAsync(fromFile, true, ct),
PrepareDbAsync(toFile, false, ct));
PrepareDbAsync(fromFile, true),
PrepareDbAsync(toFile, false));

var outputPath = outputFolder
+ Path.GetFileNameWithoutExtension(fromFile)
Expand All @@ -195,11 +178,10 @@ await Task.WhenAll(
.Append(("jobs.main.action.filePath", outputPath));
var parameters = await RunParametersAsync(
"adx-to-adx-params.json",
ct,
overrides);
var targetCommands = CommandBase.FromScript(
await File.ReadAllTextAsync(toFile));
var finalCommands = await FetchDbCommandsAsync(false, ct);
var finalCommands = await FetchDbCommandsAsync(false);
var targetModel = DatabaseModel.FromCommands(targetCommands);
var finalModel = DatabaseModel.FromCommands(finalCommands);
var finalScript = string.Join(";\n\n", finalCommands.Select(c => c.ToScript()));
Expand All @@ -213,7 +195,6 @@ await Task.WhenAll(

private async Task LoopThroughStateFilesAsync(
string folderPath,
CancellationToken ct,
Func<string, string, Task> loopFunction)
{
var stateFiles = Directory.GetFiles(folderPath);
Expand All @@ -225,34 +206,31 @@ private async Task LoopThroughStateFilesAsync(
foreach (var toFile in stateFiles)
{
Console.WriteLine($"Current loop: ({fromFile}, {toFile})");
await CleanDatabasesAsync(ct);
await CleanDatabasesAsync();
await loopFunction(fromFile, toFile);
}
}
}

private async Task ApplyCommandsAsync(
IEnumerable<CommandBase> commands,
bool isCurrent,
CancellationToken ct)
bool isCurrent)
{
var gateway = CreateKustoManagementGateway(isCurrent);

// Apply commands to the db
await gateway.ExecuteCommandsAsync(commands, ct);
await gateway.ExecuteCommandsAsync(commands);
}

private async Task<IImmutableList<CommandBase>> FetchDbCommandsAsync(
bool isCurrent,
CancellationToken ct)
private async Task<IImmutableList<CommandBase>> FetchDbCommandsAsync(bool isCurrent)
{
var gateway = CreateKustoManagementGateway(isCurrent);
var dbProvider = (IDatabaseProvider)new KustoDatabaseProvider(
new ConsoleTracer(false),
gateway);
var emptyProvider = (IDatabaseProvider)new EmptyDatabaseProvider();
var finalDb = await dbProvider.RetrieveDatabaseAsync(ct);
var emptyDb = await emptyProvider.RetrieveDatabaseAsync(ct);
var finalDb = await dbProvider.RetrieveDatabaseAsync();
var emptyDb = await emptyProvider.RetrieveDatabaseAsync();
// Use the delta from an empty db to get
var finalCommands = emptyDb.ComputeDelta(finalDb);

Expand All @@ -261,7 +239,6 @@ private async Task<IImmutableList<CommandBase>> FetchDbCommandsAsync(

protected override Task<MainParameterization> RunParametersAsync(
string parameterFilePath,
CancellationToken ct,
IEnumerable<(string path, string value)>? overrides = null)
{
var adjustedOverrides = overrides != null
Expand All @@ -273,20 +250,19 @@ protected override Task<MainParameterization> RunParametersAsync(
.Append(("tokenProvider.login.clientId", _servicePrincipalId))
.Append(("tokenProvider.login.secret", _servicePrincipalSecret));

return base.RunParametersAsync(parameterFilePath, ct, adjustedOverrides);
return base.RunParametersAsync(parameterFilePath, adjustedOverrides);
}

protected async Task CleanDatabasesAsync(CancellationToken ct)
protected async Task CleanDatabasesAsync()
{
await Task.WhenAll(
CleanDbAsync(true, ct),
CleanDbAsync(false, ct));
CleanDbAsync(true),
CleanDbAsync(false));
}

protected async Task PrepareDbAsync(
string scriptPath,
bool isCurrent,
CancellationToken ct)
bool isCurrent)
{
var script = await File.ReadAllTextAsync(scriptPath);

Expand All @@ -295,7 +271,7 @@ protected async Task PrepareDbAsync(
var commands = CommandBase.FromScript(script);
var gateway = CreateKustoManagementGateway(isCurrent);

await gateway.ExecuteCommandsAsync(commands, ct);
await gateway.ExecuteCommandsAsync(commands);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -333,18 +309,18 @@ private ITokenProvider CreateTokenProvider()
return tokenProvider!;
}

private async Task CleanDbAsync(bool isCurrent, CancellationToken ct)
private async Task CleanDbAsync(bool isCurrent)
{
var emptyDbProvider = (IDatabaseProvider)new EmptyDatabaseProvider();
var kustoGateway = CreateKustoManagementGateway(isCurrent);
var dbProvider = (IDatabaseProvider)new KustoDatabaseProvider(
new ConsoleTracer(false),
kustoGateway);
var emptyDb = await emptyDbProvider.RetrieveDatabaseAsync(ct);
var db = await dbProvider.RetrieveDatabaseAsync(ct);
var emptyDb = await emptyDbProvider.RetrieveDatabaseAsync();
var db = await dbProvider.RetrieveDatabaseAsync();
var currentDeltaCommands = db.ComputeDelta(emptyDb);

await kustoGateway.ExecuteCommandsAsync(currentDeltaCommands, ct);
await kustoGateway.ExecuteCommandsAsync(currentDeltaCommands);
}
}
}
4 changes: 2 additions & 2 deletions code/DeltaKustoAdxIntegrationTest/FailIfDataLoss/no-fail.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"current": {
"scripts": [
{
"filePath": "FailIfDataLoss/source.kql"
"filePath": "source.kql"
}
]
},
Expand All @@ -23,7 +23,7 @@
}
},
"action": {
"filePath": "outputs/fail-if-data-loss/no-fail.kql"
"filePath": "outputs/no-fail.kql"
}
}
}
Expand Down
17 changes: 6 additions & 11 deletions code/DeltaKustoAdxIntegrationTest/FailIfDataLossTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ public class FailIfDataLossTest : AdxIntegrationTestBase
public async Task TestFailIfDropsNoDrop()
{
var toFile = "FailIfDataLoss/target.kql";
var ct = CreateCancellationToken();

await CleanDatabasesAsync(ct);
await PrepareDbAsync(toFile, false, ct);
await CleanDatabasesAsync();
await PrepareDbAsync(toFile, false);

await RunParametersAsync("FailIfDataLoss/no-fail.json", ct, TargetDbOverrides);
await RunParametersAsync("FailIfDataLoss/no-fail.json", TargetDbOverrides);

// We just test that this doesn't fail
}
Expand All @@ -32,10 +31,9 @@ public async Task TestFailIfDropsNoDrop()
public async Task TestFailIfDrops()
{
var toFile = "FailIfDataLoss/target.kql";
var ct = CreateCancellationToken();

await CleanDatabasesAsync(ct);
await PrepareDbAsync(toFile, false, ct);
await CleanDatabasesAsync();
await PrepareDbAsync(toFile, false);

var overrides = TargetDbOverrides
.Append(("failIfDrops", "true"));
Expand All @@ -44,7 +42,7 @@ public async Task TestFailIfDrops()
{
// The "Main" will return non-zero which will throw an exception
var parameters =
await RunParametersAsync("FailIfDataLoss/no-fail.json", ct, overrides);
await RunParametersAsync("FailIfDataLoss/no-fail.json", overrides);

Assert.True(parameters.FailIfDataLoss);
Assert.False(true, "Should have thrown by now");
Expand All @@ -53,8 +51,5 @@ public async Task TestFailIfDrops()
{
}
}

private CancellationToken CreateCancellationToken() =>
new CancellationTokenSource(TimeSpan.FromSeconds(8)).Token;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
main:
target:
scripts:
- filePath: Functions/EmptyCurrent/EmptyDelta/target.kql
- filePath: target.kql
action:
filePath: outputs/functions/empty-current/empty-delta.kql
filePath: outputs/empty-delta.kql
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,38 @@ public class FunctionEmptyCurrentTest : IntegrationTestBase
[Fact]
public async Task EmptyDelta()
{
var parameters = await RunParametersAsync(
"Functions/EmptyCurrent/EmptyDelta/empty-delta-params.yaml",
CreateCancellationToken());
var paramPath = "Functions/EmptyCurrent/EmptyDelta/empty-delta-params.yaml";
var parameters = await RunParametersAsync(paramPath);
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var commands = await LoadScriptAsync(outputPath);
var commands = await LoadScriptAsync(paramPath, outputPath);

Assert.Empty(commands);
}

[Fact]
public async Task OneFunctionDelta()
{
var parameters = await RunParametersAsync(
"Functions/EmptyCurrent/OneFunctionDelta/delta-params.yaml",
CreateCancellationToken());
var paramPath = "Functions/EmptyCurrent/OneFunctionDelta/delta-params.yaml";
var parameters = await RunParametersAsync(paramPath);
var inputPath = parameters.Jobs!.First().Value.Target!.Scripts!.First().FilePath!;
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var inputCommands = await LoadScriptAsync(inputPath);
var outputCommands = await LoadScriptAsync(outputPath);
var inputCommands = await LoadScriptAsync(paramPath, inputPath);
var outputCommands = await LoadScriptAsync(paramPath, outputPath);

Assert.True(inputCommands.SequenceEqual(outputCommands));
}

[Fact]
public async Task TwoFunctionsDelta()
{
var parameters = await RunParametersAsync(
"Functions/EmptyCurrent/TwoFunctionsDelta/delta-params.yaml",
CreateCancellationToken());
var paramPath = "Functions/EmptyCurrent/TwoFunctionsDelta/delta-params.yaml";
var parameters = await RunParametersAsync(paramPath);
var inputPath = parameters.Jobs!.First().Value.Target!.Scripts!.First().FilePath!;
var outputPath = parameters.Jobs!.First().Value.Action!.FilePath!;
var inputCommands = await LoadScriptAsync(inputPath);
var outputCommands = await LoadScriptAsync(outputPath);
var inputCommands = await LoadScriptAsync(paramPath, inputPath);
var outputCommands = await LoadScriptAsync(paramPath, outputPath);

Assert.True(inputCommands.SequenceEqual(outputCommands));
}

private CancellationToken CreateCancellationToken() =>
new CancellationTokenSource(TimeSpan.FromSeconds(2)).Token;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
main:
target:
scripts:
- filePath: Functions/EmptyCurrent/OneFunctionDelta/target.kql
- filePath: target.kql
action:
filePath: outputs/functions/empty-current/one-function-delta.kql
filePath: outputs/one-function-delta.kql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
main:
target:
scripts:
- filePath: Functions/EmptyCurrent/TwoFunctionsDelta/target.kql
- filePath: target.kql
action:
filePath: outputs/functions/empty-current/two-functions-delta.kql
filePath: outputs/two-functions-delta.kql
Loading

0 comments on commit e2678c8

Please sign in to comment.