Skip to content

Commit

Permalink
[Asset-Sync] Control Parallel Git Invocation (Azure#4112)
Browse files Browse the repository at this point in the history
* add a new class TaskQueue that can be used to control the parallellism of git commands within a single git directory
  • Loading branch information
scbedd authored Sep 13, 2022
1 parent ce50cab commit 5d48ac5
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 52 deletions.
123 changes: 71 additions & 52 deletions tools/test-proxy/Azure.Sdk.Tools.TestProxy/Store/GitProcessHandler.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Azure.Sdk.Tools.TestProxy.Common;
using Azure.Sdk.Tools.TestProxy.Common.Exceptions;

Expand Down Expand Up @@ -36,6 +39,11 @@ class GitMinVersion
public static string minVersionString = $"{Major}.{Minor}.{Patch}";
}

/// <summary>
/// This dictionary is used to ensure that each git directory will only ever have a SINGLE git command running it at a time.
/// </summary>
private ConcurrentDictionary<string, TaskQueue> AssetTasks = new ConcurrentDictionary<string, TaskQueue>();

/// <summary>
/// Create a ProcessStartInfo that's exclusively used for execution of git commands
/// </summary>
Expand Down Expand Up @@ -106,43 +114,46 @@ public virtual CommandResult Run(string arguments, string workingDirectory)
Arguments = arguments
};

try
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();
var queue = AssetTasks.GetOrAdd(workingDirectory, new TaskQueue());

int returnCode = process.ExitCode;
queue.Enqueue(() =>
{
try
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();

DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");
int returnCode = process.ExitCode;

DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");

result.ExitCode = process.ExitCode;
result.StdErr = stdErr;
result.StdOut = stdOut;

result.ExitCode = process.ExitCode;
result.StdErr = stdErr;
result.StdOut = stdOut;

if (result.ExitCode == 0){
return result;
if (result.ExitCode != 0)
{
throw new GitProcessException(result);
}
}
else
catch (Exception e)
{
DebugLogger.LogDebug(e.Message);

result.ExitCode = -1;
result.CommandException = e;

throw new GitProcessException(result);
}
}
catch (Exception e)
{
DebugLogger.LogDebug(e.Message);

result.ExitCode = -1;
result.CommandException = e;
});

throw new GitProcessException(result);
}
return result;
}

/// <summary>
Expand All @@ -156,39 +167,47 @@ public virtual bool TryRun(string arguments, GitAssetsConfiguration config, out
{
ProcessStartInfo processStartInfo = CreateGitProcessInfo(config.AssetsRepoLocation);
processStartInfo.Arguments = arguments;
var commandResult = new CommandResult();

var queue = AssetTasks.GetOrAdd(config.AssetsRepoLocation, new TaskQueue());

try
queue.Enqueue(() =>
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();
try
{
DebugLogger.LogInformation($"git {arguments}");
var process = Process.Start(processStartInfo);
string stdOut = process.StandardOutput.ReadToEnd();
string stdErr = process.StandardError.ReadToEnd();
process.WaitForExit();

int returnCode = process.ExitCode;
int returnCode = process.ExitCode;

DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");
DebugLogger.LogDebug($"StdOut: {stdOut}");
DebugLogger.LogDebug($"StdErr: {stdErr}");
DebugLogger.LogDebug($"ExitCode: {process.ExitCode}");

result = new CommandResult()
commandResult = new CommandResult()
{
ExitCode = process.ExitCode,
StdErr = stdErr,
StdOut = stdOut,
Arguments = arguments
};
}
catch (Exception e)
{
ExitCode = process.ExitCode,
StdErr = stdErr,
StdOut = stdOut,
Arguments = arguments
};
}
catch (Exception e)
{
DebugLogger.LogDebug(e.Message);
DebugLogger.LogDebug(e.Message);

result = new CommandResult()
{
ExitCode = -1,
CommandException = e
};
}
commandResult = new CommandResult()
{
ExitCode = -1,
CommandException = e
};
}
});

result = commandResult;

if (result.ExitCode != 0)
{
Expand Down
76 changes: 76 additions & 0 deletions tools/test-proxy/Azure.Sdk.Tools.TestProxy/Store/TaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Threading.Tasks;
using System.Threading;
using System;

namespace Azure.Sdk.Tools.TestProxy.Store
{
/// <summary>
/// This class is used to control access to a directory. Within the GitProcessHandler, a queue is used per targeted git directory. This ensures
/// that multiple Async requests hitting the asset store functionality will NEVER be able to stomp on each other.
/// </summary>
public class TaskQueue
{
private SemaphoreSlim semaphore;

public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}

/// <summary>
/// Used to await the running of a single block of code. Returns a value of type T.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="incomingTask"></param>
/// <returns></returns>
public async Task<T> EnqueueAsync<T>(Func<Task<T>> incomingTask)
{
await semaphore.WaitAsync();
try
{
return await incomingTask();
}
finally
{
semaphore.Release();
}
}

/// <summary>
/// Used to await the running of a single block of code. No incoming arguments, no return types.
/// </summary>
/// <param name="incomingTask"></param>
/// <returns></returns>
public async Task EnqueueAsync(Func<Task> incomingTask)
{
await semaphore.WaitAsync();

try
{
await incomingTask();
}
finally
{
semaphore.Release();
}
}

/// <summary>
/// Used to invoke a block of code. No incoming arguments, no output arguments.
/// </summary>
/// <param name="incomingTask"></param>
public void Enqueue(Action incomingTask)
{
semaphore.Wait();

try
{
incomingTask();
}
finally
{
semaphore.Release();
}
}
}
}

0 comments on commit 5d48ac5

Please sign in to comment.