Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#90) Fixed threading issue in PushAsync #91

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,12 @@ internal async Task<Page<object>> GetPageAsync(HttpClient client, Uri requestUri
object? result = await JsonSerializer.DeserializeAsync(response.ContentStream, pageType, context.JsonSerializerOptions, cancellationToken).ConfigureAwait(false)
?? throw new DatasyncPullException("JSON result is null") { ServiceResponse = response };

return new Page<object>()
Page<object> page = new Page<object>()
{
Items = (IEnumerable<object>)itemsPropInfo.GetValue(result)!,
NextLink = (string?)nextLinkPropInfo.GetValue(result)
};
return page;
}
catch (JsonException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ namespace CommunityToolkit.Datasync.Client.Offline.OperationsQueue;
/// </summary>
internal class OperationsQueueManager : IOperationsQueueManager
{
/// <summary>
/// A lock object for locking against concurrent changes to the queue.
/// </summary>
private readonly object pushlock = new();

/// <summary>
/// The map of valid entities that can be synchronized to the service.
/// </summary>
Expand Down Expand Up @@ -296,10 +301,14 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt

if (!response.IsSuccessful)
{
operation.LastAttempt = DateTimeOffset.UtcNow;
operation.HttpStatusCode = response.StatusCode;
operation.State = OperationState.Failed;
_ = this._context.Update(operation);
lock (this.pushlock)
{
operation.LastAttempt = DateTimeOffset.UtcNow;
operation.HttpStatusCode = response.StatusCode;
operation.State = OperationState.Failed;
_ = this._context.Update(operation);
}

return response;
}

Expand All @@ -311,7 +320,11 @@ internal async Task<PushResult> PushAsync(IEnumerable<Type> entityTypes, PushOpt
ReplaceDatabaseValue(oldValue, newValue);
}

_ = this._context.DatasyncOperationsQueue.Remove(operation);
lock (this.pushlock)
{
_ = this._context.DatasyncOperationsQueue.Remove(operation);
}

return null;
}

Expand All @@ -327,8 +340,11 @@ internal void ReplaceDatabaseValue(object? oldValue, object? newValue)
throw new DatasyncException("Internal Datasync Error: invalid values for replacement.");
}

EntityEntry tracker = this._context.Entry(oldValue);
tracker.CurrentValues.SetValues(newValue);
lock (this.pushlock)
{
EntityEntry tracker = this._context.Entry(oldValue);
tracker.CurrentValues.SetValues(newValue);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ internal static bool IsValid(IMovie movie)
&& movie.Duration >= 60 && movie.Duration <= 360;
}

internal void ResetInMemoryMovies()
{
using IServiceScope scope = Services.CreateScope();
InMemoryRepository<InMemoryMovie> repository = scope.ServiceProvider.GetRequiredService<IRepository<InMemoryMovie>>() as InMemoryRepository<InMemoryMovie>;
List<InMemoryMovie> sourceData = TestCommon.TestData.Movies.OfType<InMemoryMovie>();
repository.Clear();
sourceData.ForEach(movie => repository.StoreEntity(movie));
}

internal void RunWithRepository<TEntity>(Action<InMemoryRepository<TEntity>> action) where TEntity : InMemoryTableData
{
using IServiceScope scope = Services.CreateScope();
Expand Down
22 changes: 15 additions & 7 deletions tests/CommunityToolkit.Datasync.Client.Test/Helpers/ServiceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ public abstract class ServiceTest(ServiceApplicationFactory factory)

protected DateTimeOffset StartTime { get; } = DateTimeOffset.UtcNow;

internal IntegrationDbContext GetOfflineContext(bool useRealFile = false)
internal IntegrationDbContext GetOfflineContext(bool useRealFile = false, bool enableLogging = false)
{
string filename = null;
string connectionString = "Data Source=:memory:";
if (useRealFile)
{
filename = Path.GetTempFileName();
SqliteConnectionStringBuilder builder = new();
builder.DataSource = filename;
builder.Mode = SqliteOpenMode.ReadWriteCreate;
SqliteConnectionStringBuilder builder = new()
{
DataSource = filename,
Mode = SqliteOpenMode.ReadWriteCreate
};
connectionString = builder.ConnectionString;
}

Expand All @@ -38,9 +40,12 @@ internal IntegrationDbContext GetOfflineContext(bool useRealFile = false)

DbContextOptionsBuilder<IntegrationDbContext> optionsBuilder = new();
optionsBuilder.UseSqlite(connection);
optionsBuilder.LogTo(Console.WriteLine);
optionsBuilder.EnableSensitiveDataLogging();
optionsBuilder.EnableDetailedErrors();
if (enableLogging)
{
optionsBuilder.LogTo(Console.WriteLine);
optionsBuilder.EnableSensitiveDataLogging();
optionsBuilder.EnableDetailedErrors();
}

IntegrationDbContext context = new(optionsBuilder.Options)
{
Expand Down Expand Up @@ -71,6 +76,9 @@ internal InMemoryMovie GetRandomMovie()
internal TEntity GetServerEntityById<TEntity>(string id) where TEntity : InMemoryTableData
=> factory.GetServerEntityById<TEntity>(id);

internal void ResetInMemoryMovies()
=> factory.ResetInMemoryMovies();

protected void SeedKitchenSinkWithCountryData()
{
factory.RunWithRepository<InMemoryKitchenSink>(repository =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public void Dispose()
[Fact]
public async Task PushAsync_Complex_Situation()
{
ResetInMemoryMovies();

PullResult initialPullResults = await this.context.Movies.PullAsync();
initialPullResults.IsSuccessful.Should().BeTrue();
initialPullResults.Additions.Should().Be(248);
Expand Down Expand Up @@ -109,4 +111,76 @@ public async Task PushAsync_Complex_Situation()
// The service always replaces additions and replacements - updating the last updatedAt.
pullResults.Replacements.Should().Be(moviesToReplace.Count + 1);
}

[Fact]
public async Task PushAsync_Multithreaded()
{
ResetInMemoryMovies();

PullResult initialPullResults = await this.context.Movies.PullAsync();
initialPullResults.IsSuccessful.Should().BeTrue();
initialPullResults.Additions.Should().Be(248);
initialPullResults.Deletions.Should().Be(0);
initialPullResults.Replacements.Should().Be(0);

// Let's add some new movies
ClientMovie blackPanther = new(TestCommon.TestData.Movies.BlackPanther) { Id = Guid.NewGuid().ToString("N") };
this.context.Movies.Add(blackPanther);
await this.context.SaveChangesAsync();

// And remove any movie that matches some criteria
List<ClientMovie> moviesToDelete = await this.context.Movies.Where(x => x.Duration > 180).ToListAsync();
this.context.Movies.RemoveRange(moviesToDelete);
await this.context.SaveChangesAsync();

// Then replace all the Unrated movies with a rating of NC17
List<ClientMovie> moviesToReplace = await this.context.Movies.Where(x => x.Rating == MovieRating.Unrated).ToListAsync();
moviesToReplace.ForEach(r =>
{
r.Rating = MovieRating.NC17;
r.Title = r.Title.PadLeft('-');
this.context.Movies.Update(r);
});
await this.context.SaveChangesAsync();

// Check the queue.
List<DatasyncOperation> operations = await this.context.DatasyncOperationsQueue.ToListAsync();
operations.Count.Should().Be(1 + moviesToDelete.Count + moviesToReplace.Count);
operations.Count(x => x.Kind is OperationKind.Add).Should().Be(1);
operations.Count(x => x.Kind is OperationKind.Delete).Should().Be(moviesToDelete.Count);
operations.Count(x => x.Kind is OperationKind.Replace).Should().Be(moviesToReplace.Count);

// Now push the results and check what we did
PushResult pushResults = await this.context.Movies.PushAsync(new PushOptions { ParallelOperations = 8 });

// This little snippet of code is to aid debugging if this test fails
if (!pushResults.IsSuccessful)
{
foreach (KeyValuePair<string, ServiceResponse> failedRequest in pushResults.FailedRequests)
{
string id = failedRequest.Key;
ServiceResponse response = failedRequest.Value;
string jsonContent = string.Empty;
if (response.HasContent)
{
using StreamReader reader = new(response.ContentStream);
jsonContent = reader.ReadToEnd();
}

Console.WriteLine($"FAILED REQUEST FOR ID: {id}: {response.StatusCode}\n{jsonContent}");
}
}

pushResults.IsSuccessful.Should().BeTrue();
pushResults.CompletedOperations.Should().Be(1 + moviesToDelete.Count + moviesToReplace.Count);
this.context.DatasyncOperationsQueue.Should().BeEmpty();

// Now use PullAsync() again - these should all be pulled down again
PullResult pullResults = await this.context.PullAsync();
pullResults.IsSuccessful.Should().BeTrue();
pullResults.Additions.Should().Be(0);
pullResults.Deletions.Should().Be(0);
// The service always replaces additions and replacements - updating the last updatedAt.
pullResults.Replacements.Should().Be(moviesToReplace.Count + 1);
}
}
Loading