Skip to content

Commit

Permalink
(#90) Fixed threading issue in PushAsync (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
adrianhall authored Aug 30, 2024
1 parent 4892839 commit 92a2936
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,12 @@ internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri
object? result = await JsonSerializer.DeserializeAsync(response.ContentStream, pageType, context.JsonSerializerOptions, cancellationToken).ConfigureAwait(false)
?? throw new DatasyncPullException("JSON result is null") { ServiceResponse = response };

return new Page<object>()
Page<object> page = new Page<object>()
{
Items = (IEnumerable<object>)itemsPropInfo.GetValue(result)!,
NextLink = (string?)nextLinkPropInfo.GetValue(result)
};
return page;
}
catch (JsonException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ namespace CommunityToolkit.Datasync.Client.Offline.OperationsQueue;
/// </summary>
internal class OperationsQueueManager : IOperationsQueueManager
{
/// <summary>
/// A lock object for locking against concurrent changes to the queue.
/// </summary>
private readonly object pushlock = new();

/// <summary>
/// The map of valid entities that can be synchronized to the service.
/// </summary>
Expand Down Expand Up @@ -296,10 +301,14 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt

if (!response.IsSuccessful)
{
operation.LastAttempt = DateTimeOffset.UtcNow;
operation.HttpStatusCode = response.StatusCode;
operation.State = OperationState.Failed;
_ = this._context.Update(operation);
lock (this.pushlock)
{
operation.LastAttempt = DateTimeOffset.UtcNow;
operation.HttpStatusCode = response.StatusCode;
operation.State = OperationState.Failed;
_ = this._context.Update(operation);
}

return response;
}

Expand All @@ -311,7 +320,11 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt
ReplaceDatabaseValue(oldValue, newValue);
}

_ = this._context.DatasyncOperationsQueue.Remove(operation);
lock (this.pushlock)
{
_ = this._context.DatasyncOperationsQueue.Remove(operation);
}

return null;
}

Expand All @@ -327,8 +340,11 @@ internal void ReplaceDatabaseValue(object? oldValue, object? newValue)
throw new DatasyncException("Internal Datasync Error: invalid values for replacement.");
}

EntityEntry tracker = this._context.Entry(oldValue);
tracker.CurrentValues.SetValues(newValue);
lock (this.pushlock)
{
EntityEntry tracker = this._context.Entry(oldValue);
tracker.CurrentValues.SetValues(newValue);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ internal static bool IsValid(IMovie movie)
&& movie.Duration >= 60 && movie.Duration <= 360;
}

internal void ResetInMemoryMovies()
{
using IServiceScope scope = Services.CreateScope();
InMemoryRepository<InMemoryMovie> repository = scope.ServiceProvider.GetRequiredService<IRepository<InMemoryMovie>>() as InMemoryRepository<InMemoryMovie>;
List<InMemoryMovie> sourceData = TestCommon.TestData.Movies.OfType<InMemoryMovie>();
repository.Clear();
sourceData.ForEach(movie => repository.StoreEntity(movie));
}

internal void RunWithRepository<TEntity>(Action<InMemoryRepository<TEntity>> action) where TEntity : InMemoryTableData
{
using IServiceScope scope = Services.CreateScope();
Expand Down
22 changes: 15 additions & 7 deletions tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ public abstract class ServiceTest(ServiceApplicationFactory factory)

protected DateTimeOffset StartTime { get; } = DateTimeOffset.UtcNow;

internal IntegrationDbContext GetOfflineContext(bool useRealFile = false)
internal IntegrationDbContext GetOfflineContext(bool useRealFile = false, bool enableLogging = false)
{
string filename = null;
string connectionString = "Data Source=:memory:";
if (useRealFile)
{
filename = Path.GetTempFileName();
SqliteConnectionStringBuilder builder = new();
builder.DataSource = filename;
builder.Mode = SqliteOpenMode.ReadWriteCreate;
SqliteConnectionStringBuilder builder = new()
{
DataSource = filename,
Mode = SqliteOpenMode.ReadWriteCreate
};
connectionString = builder.ConnectionString;
}

Expand All @@ -38,9 +40,12 @@ internal IntegrationDbContext GetOfflineContext(bool useRealFile = false)

DbContextOptionsBuilder<IntegrationDbContext> optionsBuilder = new();
optionsBuilder.UseSqlite(connection);
optionsBuilder.LogTo(Console.WriteLine);
optionsBuilder.EnableSensitiveDataLogging();
optionsBuilder.EnableDetailedErrors();
if (enableLogging)
{
optionsBuilder.LogTo(Console.WriteLine);
optionsBuilder.EnableSensitiveDataLogging();
optionsBuilder.EnableDetailedErrors();
}

IntegrationDbContext context = new(optionsBuilder.Options)
{
Expand Down Expand Up @@ -71,6 +76,9 @@ internal InMemoryMovie GetRandomMovie()
internal TEntity GetServerEntityById<TEntity>(string id) where TEntity : InMemoryTableData
=> factory.GetServerEntityById<TEntity>(id);

internal void ResetInMemoryMovies()
=> factory.ResetInMemoryMovies();

protected void SeedKitchenSinkWithCountryData()
{
factory.RunWithRepository<InMemoryKitchenSink>(repository =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public void Dispose()
[Fact]
public async Task PushAsync_Complex_Situation()
{
ResetInMemoryMovies();

PullResult initialPullResults = await this.context.Movies.PullAsync();
initialPullResults.IsSuccessful.Should().BeTrue();
initialPullResults.Additions.Should().Be(248);
Expand Down Expand Up @@ -109,4 +111,76 @@ public async Task PushAsync_Complex_Situation()
// The service always replaces additions and replacements - updating the last updatedAt.
pullResults.Replacements.Should().Be(moviesToReplace.Count + 1);
}

[Fact]
public async Task PushAsync_Multithreaded()
{
ResetInMemoryMovies();

PullResult initialPullResults = await this.context.Movies.PullAsync();
initialPullResults.IsSuccessful.Should().BeTrue();
initialPullResults.Additions.Should().Be(248);
initialPullResults.Deletions.Should().Be(0);
initialPullResults.Replacements.Should().Be(0);

// Let's add some new movies
ClientMovie blackPanther = new(TestCommon.TestData.Movies.BlackPanther) { Id = Guid.NewGuid().ToString("N") };
this.context.Movies.Add(blackPanther);
await this.context.SaveChangesAsync();

// And remove any movie that matches some criteria
List<ClientMovie> moviesToDelete = await this.context.Movies.Where(x => x.Duration > 180).ToListAsync();
this.context.Movies.RemoveRange(moviesToDelete);
await this.context.SaveChangesAsync();

// Then replace all the Unrated movies with a rating of NC17
List<ClientMovie> moviesToReplace = await this.context.Movies.Where(x => x.Rating == MovieRating.Unrated).ToListAsync();
moviesToReplace.ForEach(r =>
{
r.Rating = MovieRating.NC17;
r.Title = r.Title.PadLeft('-');
this.context.Movies.Update(r);
});
await this.context.SaveChangesAsync();

// Check the queue.
List<DatasyncOperation> operations = await this.context.DatasyncOperationsQueue.ToListAsync();
operations.Count.Should().Be(1 + moviesToDelete.Count + moviesToReplace.Count);
operations.Count(x => x.Kind is OperationKind.Add).Should().Be(1);
operations.Count(x => x.Kind is OperationKind.Delete).Should().Be(moviesToDelete.Count);
operations.Count(x => x.Kind is OperationKind.Replace).Should().Be(moviesToReplace.Count);

// Now push the results and check what we did
PushResult pushResults = await this.context.Movies.PushAsync(new PushOptions { ParallelOperations = 8 });

// This little snippet of code is to aid debugging if this test fails
if (!pushResults.IsSuccessful)
{
foreach (KeyValuePair<string, ServiceResponse> failedRequest in pushResults.FailedRequests)
{
string id = failedRequest.Key;
ServiceResponse response = failedRequest.Value;
string jsonContent = string.Empty;
if (response.HasContent)
{
using StreamReader reader = new(response.ContentStream);
jsonContent = reader.ReadToEnd();
}

Console.WriteLine($"FAILED REQUEST FOR ID: {id}: {response.StatusCode}\n{jsonContent}");
}
}

pushResults.IsSuccessful.Should().BeTrue();
pushResults.CompletedOperations.Should().Be(1 + moviesToDelete.Count + moviesToReplace.Count);
this.context.DatasyncOperationsQueue.Should().BeEmpty();

// Now use PullAsync() again - these should all be pulled down again
PullResult pullResults = await this.context.PullAsync();
pullResults.IsSuccessful.Should().BeTrue();
pullResults.Additions.Should().Be(0);
pullResults.Deletions.Should().Be(0);
// The service always replaces additions and replacements - updating the last updatedAt.
pullResults.Replacements.Should().Be(moviesToReplace.Count + 1);
}
}

0 comments on commit 92a2936

Please sign in to comment.