Skip to content

Commit

Permalink
refactor: rmeove unneeded sessionid from IResultTable interface (#567)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Jan 3, 2024
2 parents 59e51f3 + f7db81b commit 9866c2e
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 196 deletions.
115 changes: 55 additions & 60 deletions Adaptors/Memory/src/ResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,6 @@ public class ResultTable : IResultTable

private bool isInitialized_;

/// <inheritdoc />
public Task ChangeResultOwnership(string sessionId,
string oldTaskId,
IEnumerable<IResultTable.ChangeResultOwnershipRequest> requests,
CancellationToken cancellationToken)
{
foreach (var request in requests)
{
foreach (var result in results_.Values.ToImmutableList()
.Where(result => result.OwnerTaskId == oldTaskId))
{
results_.TryUpdate(result.ResultId,
result with
{
OwnerTaskId = request.NewTaskId,
},
result);
}
}

return Task.CompletedTask;
}

/// <inheritdoc />
public Task Create(ICollection<Result> results,
CancellationToken cancellationToken = default)
Expand All @@ -79,39 +56,6 @@ public Task Create(ICollection<Result> results,
return Task.CompletedTask;
}

/// <inheritdoc />
public Task AddTaskDependencies(string sessionId,
IDictionary<string, ICollection<string>> dependencies,
CancellationToken cancellationToken = default)
{
foreach (var (resultId, taskIds) in dependencies)
{
if (!results_.TryGetValue(resultId,
out var result))
{
throw new ResultNotFoundException($"Key '{resultId}' not found");
}

result.DependentTasks.AddRange(taskIds);
}

return Task.CompletedTask;
}

/// <inheritdoc />
public Task DeleteResult(string session,
string key,
CancellationToken cancellationToken = default)
{
if (!results_.ContainsKey(key))
{
throw new ResultNotFoundException($"Key '{key}' not found");
}

return Task.FromResult(results_.Remove(key,
out _));
}

/// <inheritdoc />
public Task DeleteResults(string sessionId,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -155,8 +99,7 @@ public IAsyncEnumerable<T> GetResults<T>(Expression<Func<Result, bool>> filter,
.Take(pageSize), ordered.Count()));
}

public Task SetTaskOwnership(string sessionId,
ICollection<(string resultId, string taskId)> requests,
public Task SetTaskOwnership(ICollection<(string resultId, string taskId)> requests,
CancellationToken cancellationToken = default)
{
foreach (var (resultId, taskId) in requests)
Expand Down Expand Up @@ -216,8 +159,7 @@ public Task<long> UpdateManyResults(Expression<Func<Result, bool>>
}

/// <inheritdoc />
public Task<Result> UpdateOneResult(string sessionId,
string resultId,
public Task<Result> UpdateOneResult(string resultId,
ICollection<(Expression<Func<Result, object?>> selector, object? newValue)> updates,
CancellationToken cancellationToken = default)
{
Expand All @@ -231,4 +173,57 @@ public Task<Result> UpdateOneResult(string
updates);
return Task.FromResult(result);
}

/// <inheritdoc />
public Task ChangeResultOwnership(string oldTaskId,
IEnumerable<IResultTable.ChangeResultOwnershipRequest> requests,
CancellationToken cancellationToken)
{
foreach (var request in requests)
{
foreach (var result in results_.Values.ToImmutableList()
.Where(result => result.OwnerTaskId == oldTaskId))
{
results_.TryUpdate(result.ResultId,
result with
{
OwnerTaskId = request.NewTaskId,
},
result);
}
}

return Task.CompletedTask;
}

/// <inheritdoc />
public Task AddTaskDependencies(IDictionary<string, ICollection<string>> dependencies,
CancellationToken cancellationToken = default)
{
foreach (var (resultId, taskIds) in dependencies)
{
if (!results_.TryGetValue(resultId,
out var result))
{
throw new ResultNotFoundException($"Key '{resultId}' not found");
}

result.DependentTasks.AddRange(taskIds);
}

return Task.CompletedTask;
}

/// <inheritdoc />
public Task DeleteResult(string key,
CancellationToken cancellationToken = default)
{
if (!results_.ContainsKey(key))
{
throw new ResultNotFoundException($"Key '{key}' not found");
}

return Task.FromResult(results_.Remove(key,
out _));
}
}
49 changes: 14 additions & 35 deletions Adaptors/MongoDB/src/ResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,11 @@ public async Task Create(ICollection<Result> results,
}

/// <inheritdoc />
public async Task AddTaskDependencies(string sessionId,
IDictionary<string, ICollection<string>> dependencies,
public async Task AddTaskDependencies(IDictionary<string, ICollection<string>> dependencies,
CancellationToken cancellationToken = default)
{
using var activity = activitySource_.StartActivity($"{nameof(AddTaskDependencies)}");
activity?.SetTag($"{nameof(AddTaskDependencies)}_sessionId",
sessionId);

var resultCollection = resultCollectionProvider_.Get();
using var activity = activitySource_.StartActivity($"{nameof(AddTaskDependencies)}");
var resultCollection = resultCollectionProvider_.Get();

if (!dependencies.Any())
{
Expand All @@ -122,15 +118,12 @@ public async Task AddTaskDependencies(string s
}
}

async Task<Result> IResultTable.GetResult(string sessionId,
string resultId,
async Task<Result> IResultTable.GetResult(string resultId,
CancellationToken cancellationToken)
{
using var activity = activitySource_.StartActivity($"{nameof(IResultTable.GetResult)}");
activity?.SetTag($"{nameof(IResultTable.GetResult)}_sessionId",
sessionId);
var sessionHandle = sessionProvider_.Get();
var resultCollection = resultCollectionProvider_.Get();
using var activity = activitySource_.StartActivity($"{nameof(IResultTable.GetResult)}");
var sessionHandle = sessionProvider_.Get();
var resultCollection = resultCollectionProvider_.Get();
try
{
return await resultCollection.AsQueryable(sessionHandle)
Expand Down Expand Up @@ -182,8 +175,7 @@ async Task<Result> IResultTable.GetResult(string sessionId,
}

/// <inheritdoc />
public async Task SetTaskOwnership(string sessionId,
ICollection<(string resultId, string taskId)> requests,
public async Task SetTaskOwnership(ICollection<(string resultId, string taskId)> requests,
CancellationToken cancellationToken = default)
{
using var activity = activitySource_.StartActivity($"{nameof(SetTaskOwnership)}");
Expand All @@ -208,16 +200,12 @@ public async Task SetTaskOwnership(string
}

/// <inheritdoc />
public async Task ChangeResultOwnership(string sessionId,
string oldTaskId,
public async Task ChangeResultOwnership(string oldTaskId,
IEnumerable<IResultTable.ChangeResultOwnershipRequest> requests,
CancellationToken cancellationToken)
{
using var activity = activitySource_.StartActivity($"{nameof(ChangeResultOwnership)}");
activity?.SetTag($"{nameof(ChangeResultOwnership)}_sessionId",
sessionId);

var resultCollection = resultCollectionProvider_.Get();
using var activity = activitySource_.StartActivity($"{nameof(ChangeResultOwnership)}");
var resultCollection = resultCollectionProvider_.Get();

await resultCollection.BulkWriteAsync(requests.Select(r =>
{
Expand All @@ -228,10 +216,7 @@ await resultCollection.BulkWriteAsync(requests.Select(r =>
Builders<Result>.Filter
.Eq(model
=> model.OwnerTaskId,
oldTaskId),
Builders<Result>.Filter
.Eq(model => model.SessionId,
sessionId)),
oldTaskId)),
Builders<Result>.Update.Set(model => model.OwnerTaskId,
r.NewTaskId));
}),
Expand All @@ -241,13 +226,10 @@ await resultCollection.BulkWriteAsync(requests.Select(r =>


/// <inheritdoc />
public async Task DeleteResult(string session,
string key,
public async Task DeleteResult(string key,
CancellationToken cancellationToken = default)
{
using var activity = activitySource_.StartActivity($"{nameof(DeleteResult)}");
activity?.SetTag($"{nameof(DeleteResult)}_sessionId",
session);
activity?.SetTag($"{nameof(DeleteResult)}_key",
key);
var resultCollection = resultCollectionProvider_.Get();
Expand Down Expand Up @@ -296,14 +278,11 @@ await resultCollection.DeleteManyAsync(model => model.SessionId == sessionId,
}

/// <inheritdoc />
public async Task<Result> UpdateOneResult(string sessionId,
string resultId,
public async Task<Result> UpdateOneResult(string resultId,
ICollection<(Expression<Func<Result, object?>> selector, object? newValue)> updates,
CancellationToken cancellationToken = default)
{
using var activity = activitySource_.StartActivity($"{nameof(UpdateOneResult)}");
activity?.SetTag($"{nameof(DeleteResult)}_sessionId",
sessionId);
activity?.SetTag($"{nameof(DeleteResult)}_resultId",
resultId);
var resultCollection = resultCollectionProvider_.Get();
Expand Down
24 changes: 6 additions & 18 deletions Common/src/Storage/IResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ public interface IResultTable : IInitializable
/// <summary>
/// Change ownership (in batch) of the results in the given request
/// </summary>
/// <param name="sessionId">Session id of the session using the results</param>
/// <param name="oldTaskId">Task Id of the previous owner</param>
/// <param name="requests">Change ownership requests that will be executed</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task ChangeResultOwnership(string sessionId,
string oldTaskId,
Task ChangeResultOwnership(string oldTaskId,
IEnumerable<ChangeResultOwnershipRequest> requests,
CancellationToken cancellationToken);

Expand All @@ -68,27 +66,23 @@ Task Create(ICollection<Result> results,
/// <summary>
/// Add the tasks Ids to the list of reverse dependencies of the given results
/// </summary>
/// <param name="sessionId">Id of the session containing the result</param>
/// <param name="dependencies">Dictionary of the dependant tasks for each result</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task AddTaskDependencies(string sessionId,
IDictionary<string, ICollection<string>> dependencies,
Task AddTaskDependencies(IDictionary<string, ICollection<string>> dependencies,
CancellationToken cancellationToken = default);

/// <summary>
/// Delete the results from the database
/// </summary>
/// <param name="session">id of the session containing the result</param>
/// <param name="key">id of the result to be deleted</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task DeleteResult(string session,
string key,
Task DeleteResult(string key,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down Expand Up @@ -141,27 +135,23 @@ IAsyncEnumerable<T> GetResults<T>(Expression<Func<Result, bool>> filter,
/// <summary>
/// Set Task that should produce the result
/// </summary>
/// <param name="sessionId"></param>
/// <param name="requests">Results to update with the associated task id</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
Task SetTaskOwnership(string sessionId,
ICollection<(string resultId, string taskId)> requests,
Task SetTaskOwnership(ICollection<(string resultId, string taskId)> requests,
CancellationToken cancellationToken = default);

/// <summary>
/// Get the result from its id
/// </summary>
/// <param name="sessionId">id of the session containing the result</param>
/// <param name="key">id of the result to be retrieved</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// Result metadata from the database
/// </returns>
public async Task<Result> GetResult(string sessionId,
string key,
public async Task<Result> GetResult(string key,
CancellationToken cancellationToken = default)
{
try
Expand All @@ -182,15 +172,13 @@ public async Task<Result> GetResult(string sessionId,
/// <summary>
/// Update one result with the given new values
/// </summary>
/// <param name="sessionId">Id of the session where the result to be updated is</param>
/// <param name="resultId">Id of the result to be updated</param>
/// <param name="updates">Collection of fields to update and their new value</param>
/// <param name="cancellationToken">Token used to cancel the execution of the method</param>
/// <returns>
/// The result metadata before the update
/// </returns>
Task<Result> UpdateOneResult(string sessionId,
string resultId,
Task<Result> UpdateOneResult(string resultId,
ICollection<(Expression<Func<Result, object?>> selector, object? newValue)> updates,
CancellationToken cancellationToken = default);

Expand Down
Loading

0 comments on commit 9866c2e

Please sign in to comment.