Skip to content

Commit

Permalink
Enable to activate Grain after clearing its state (#9165)
Browse files Browse the repository at this point in the history
* Enable to activate Grain after clearing its state

* Update src/AWS/Orleans.Persistence.DynamoDB/Provider/DynamoDBGrainStorage.cs

* Update src/Azure/Orleans.Persistence.AzureStorage/Providers/Storage/AzureTableStorage.cs

---------

Co-authored-by: Reuben Bond <[email protected]>
  • Loading branch information
scalalang2 and ReubenBond authored Nov 12, 2024
1 parent 0c5d545 commit 90fd1d0
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Orleans.Configuration;
using Orleans.Persistence.DynamoDB;
using Orleans.Runtime;
using Orleans.Serialization.Serializers;

namespace Orleans.Storage
{
Expand All @@ -33,6 +34,7 @@ public class DynamoDBGrainStorage : IGrainStorage, ILifecycleParticipant<ISiloLi
private readonly DynamoDBStorageOptions options;
private readonly ILogger logger;
private readonly IServiceProvider serviceProvider;
private readonly IActivatorProvider activatorProvider;
private readonly string name;

private DynamoDBStorage storage;
Expand All @@ -50,6 +52,7 @@ public DynamoDBGrainStorage(
this.logger = logger;
this.options = options;
this.serviceProvider = serviceProvider;
this.activatorProvider = this.serviceProvider.GetRequiredService<IActivatorProvider>();
}

public void Participate(ISiloLifecycle lifecycle)
Expand Down Expand Up @@ -152,6 +155,12 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
grainState.State = loadedState ?? Activator.CreateInstance<T>();
grainState.ETag = record.ETag.ToString();
}
else
{
grainState.RecordExists = false;
grainState.ETag = null;
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}

// Else leave grainState in previous default condition
}
Expand Down Expand Up @@ -294,6 +303,7 @@ public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainSt
else
{
await WriteStateInternal(grainState, record, true);
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}
}
catch (Exception exc)
Expand Down Expand Up @@ -330,7 +340,8 @@ internal T ConvertFromStorageFormat<T>(GrainStateRecord entity)
T dataValue = default;
try
{
dataValue = this.options.GrainStorageSerializer.Deserialize<T>(entity.State);
if (entity.State is { Length: > 0 })
dataValue = this.options.GrainStorageSerializer.Deserialize<T>(entity.State);
}
catch (Exception exc)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Orleans.Persistence.AzureStorage;
using Orleans.Providers.Azure;
using Orleans.Runtime;
using Orleans.Serialization.Serializers;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

namespace Orleans.Storage
Expand All @@ -28,6 +29,7 @@ public class AzureTableGrainStorage : IGrainStorage, IRestExceptionDecoder, ILif
private readonly AzureTableStorageOptions options;
private readonly ClusterOptions clusterOptions;
private readonly IGrainStorageSerializer storageSerializer;
private readonly IActivatorProvider activatorProvider;
private readonly ILogger logger;

private GrainStateTableDataManager tableDataManager;
Expand Down Expand Up @@ -55,6 +57,7 @@ public AzureTableGrainStorage(
this.clusterOptions = clusterOptions.Value;
this.name = name;
this.storageSerializer = options.GrainStorageSerializer;
this.activatorProvider = services.GetRequiredService<IActivatorProvider>();
this.logger = logger;
}

Expand All @@ -81,6 +84,12 @@ public async Task ReadStateAsync<T>(string grainType, GrainId grainId, IGrainSta
grainState.State = loadedState ?? Activator.CreateInstance<T>();
grainState.ETag = entity.ETag.ToString();
}
else
{
grainState.RecordExists = false;
grainState.ETag = null;
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}
// Else leave grainState in previous default condition
}

Expand Down Expand Up @@ -160,6 +169,7 @@ public async Task ClearStateAsync<T>(string grainType, GrainId grainId, IGrainSt
else
{
await DoOptimisticUpdate(() => tableDataManager.Write(entity), grainType, grainId, this.options.TableName, grainState.ETag).ConfigureAwait(false);
grainState.State = this.activatorProvider.GetActivator<T>().Create();
}

grainState.ETag = entity.ETag.ToString(); // Update in-memory data to the new ETag
Expand Down Expand Up @@ -351,7 +361,8 @@ internal T ConvertFromStorageFormat<T>(TableEntity entity)
var input = binaryData.Length > 0
? new BinaryData(binaryData)
: new BinaryData(stringData);
dataValue = this.storageSerializer.Deserialize<T>(input);
if(input.Length > 0)
dataValue = this.storageSerializer.Deserialize<T>(input);
}
catch (Exception exc)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Storage;
using TestExtensions;
using UnitTests.Persistence;
Expand Down Expand Up @@ -54,21 +56,22 @@ public async Task PersistenceProvider_DynamoDB_WriteRead(int? stringLength, bool
}

[SkippableTheory, TestCategory("Functional")]
[InlineData(null, false)]
[InlineData(null, true)]
[InlineData(400_000, false)]
[InlineData(400_000, true)]
public async Task PersistenceProvider_DynamoDB_WriteClearRead(int? stringLength, bool useJson)
[InlineData(null, false, false)]
[InlineData(null, true, false)]
[InlineData(400_000, false, false)]
[InlineData(400_000, true, false)]
public async Task PersistenceProvider_DynamoDB_WriteClearRead(int? stringLength, bool useJson, bool useFallback)
{
var testName = string.Format("{0}({1} = {2}, {3} = {4})",
var testName = string.Format("{0}({1} = {2}, {3} = {4}, {5} = {6})",
nameof(PersistenceProvider_DynamoDB_WriteClearRead),
nameof(stringLength), stringLength == null ? "default" : stringLength.ToString(),
nameof(useJson), useJson);
nameof(useJson), useJson,
nameof(useFallback), useFallback);

var grainState = TestStoreGrainState.NewRandomState(stringLength);
EnsureEnvironmentSupportsState(grainState);

var store = await InitDynamoDBGrainStorage(useJson);
var store = await InitDynamoDBGrainStorage(useJson, useFallback);

await Test_PersistenceProvider_WriteClearRead(testName, store, grainState);
}
Expand Down Expand Up @@ -161,20 +164,31 @@ public async Task DynamoDBStorage_ConvertToFromStorageFormat(int? stringLength,

private async Task<DynamoDBGrainStorage> InitDynamoDBGrainStorage(DynamoDBStorageOptions options)
{
options.GrainStorageSerializer = ActivatorUtilities.CreateInstance<JsonGrainStorageSerializer>(this.providerRuntime.ServiceProvider);
DynamoDBGrainStorage store = ActivatorUtilities.CreateInstance<DynamoDBGrainStorage>(this.providerRuntime.ServiceProvider, "StorageProviderTests", options);
ISiloLifecycleSubject lifecycle = ActivatorUtilities.CreateInstance<SiloLifecycleSubject>(this.providerRuntime.ServiceProvider, NullLogger<SiloLifecycleSubject>.Instance);
store.Participate(lifecycle);
await lifecycle.OnStart();
return store;
}

private Task<DynamoDBGrainStorage> InitDynamoDBGrainStorage(bool useJson = false)
private Task<DynamoDBGrainStorage> InitDynamoDBGrainStorage(bool useJson = false, bool useFallback = true)
{
var options = new DynamoDBStorageOptions
{
Service = AWSTestConstants.DynamoDbService,
};

var jsonOptions = this.providerRuntime.ServiceProvider.GetService<IOptions<OrleansJsonSerializerOptions>>();
var binarySerializer = new OrleansGrainStorageSerializer(this.providerRuntime.ServiceProvider.GetRequiredService<Serializer>());
var jsonSerializer = new JsonGrainStorageSerializer(new OrleansJsonSerializer(jsonOptions));

if (useFallback)
options.GrainStorageSerializer = useJson
? new GrainStorageSerializer(jsonSerializer, binarySerializer)
: new GrainStorageSerializer(binarySerializer, jsonSerializer);
else
options.GrainStorageSerializer = useJson ? jsonSerializer : binarySerializer;

return InitDynamoDBGrainStorage(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,22 @@ public async Task PersistenceProvider_Azure_WriteRead(int? stringLength, bool us
}

[SkippableTheory, TestCategory("Functional"), TestCategory("AzureStorage")]
[InlineData(null, false)]
[InlineData(null, true)]
[InlineData(15 * 64 * 1024 - 256, false)]
[InlineData(15 * 32 * 1024 - 256, true)]
public async Task PersistenceProvider_Azure_WriteClearRead(int? stringLength, bool useJson)
[InlineData(null, false, false)]
[InlineData(null, true, false)]
[InlineData(15 * 64 * 1024 - 256, false, false)]
[InlineData(15 * 32 * 1024 - 256, true, false)]
public async Task PersistenceProvider_Azure_WriteClearRead(int? stringLength, bool useJson, bool useFallback)
{
var testName = string.Format("{0}({1} = {2}, {3} = {4})",
var testName = string.Format("{0}({1} = {2}, {3} = {4}, {5} = {6})",
nameof(PersistenceProvider_Azure_WriteClearRead),
nameof(stringLength), stringLength == null ? "default" : stringLength.ToString(),
nameof(useJson), useJson);
nameof(useJson), useJson,
nameof(useFallback), useFallback);

var grainState = TestStoreGrainState.NewRandomState(stringLength);
EnsureEnvironmentSupportsState(grainState);

var store = await InitAzureTableGrainStorage(useJson);
var store = await InitAzureTableGrainStorage(useJson, useFallback);

await Test_PersistenceProvider_WriteClearRead(testName, store, grainState);
}
Expand Down Expand Up @@ -300,7 +301,7 @@ private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(AzureTable
return store;
}

private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(bool useJson = false, bool useStringFormat = false, TypeNameHandling? typeNameHandling = null)
private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(bool useJson = false, bool useFallback = true, bool useStringFormat = false, TypeNameHandling? typeNameHandling = null)
{
if (useStringFormat && !useJson)
{
Expand All @@ -320,9 +321,12 @@ private async Task<AzureTableGrainStorage> InitAzureTableGrainStorage(bool useJs
// TODO change test to include more serializer?
var binarySerializer = new OrleansGrainStorageSerializer(this.providerRuntime.ServiceProvider.GetRequiredService<Serializer>());
var jsonSerializer = new JsonGrainStorageSerializer(new OrleansJsonSerializer(jsonOptions));
options.GrainStorageSerializer = useJson
? new GrainStorageSerializer(jsonSerializer, binarySerializer)
: new GrainStorageSerializer(binarySerializer, jsonSerializer);
if (useFallback)
options.GrainStorageSerializer = useJson
? new GrainStorageSerializer(jsonSerializer, binarySerializer)
: new GrainStorageSerializer(binarySerializer, jsonSerializer);
else
options.GrainStorageSerializer = useJson ? jsonSerializer : binarySerializer;

AzureTableGrainStorage store = ActivatorUtilities.CreateInstance<AzureTableGrainStorage>(this.providerRuntime.ServiceProvider, options, "TestStorage");
ISiloLifecycleSubject lifecycle = ActivatorUtilities.CreateInstance<SiloLifecycleSubject>(this.providerRuntime.ServiceProvider);
Expand Down

0 comments on commit 90fd1d0

Please sign in to comment.