Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cluster.Sharding DData backward compatibility wire format mode #6775

Merged
merged 5 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// associated with an assembly.
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests")]
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: InternalsVisibleTo("Akka.DistributedData.Tests")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private static EventSourcedRememberEntitiesShardStore.EntitiesStopped EntitiesSt
}

//
// ShardCoordinator.State
// ShardCoordinator.CoordinatorState
//
private static Proto.Msg.CoordinatorState CoordinatorStateToProto(ShardCoordinator.CoordinatorState state)
{
Expand Down
4 changes: 4 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ akka.cluster.sharding {
# can become to large if including to many in same message. Limit to
# the same number as the number of ORSet per shard.
max-delta-elements = 5

# Turn on backward compatibility wire format mode that allows Akka.Cluster.Sharding
# v1.5.8 distributed data to communicate with v1.4.x
backward-compatible-wire-format = false
}
# The id of the dispatcher to use for ClusterSharding actors.
# If not specified, the internal dispatcher is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Cluster.Sharding\Akka.Cluster.Sharding.csproj" />
<ProjectReference Include="..\Akka.DistributedData.LightningDB\Akka.DistributedData.LightningDB.csproj" />
<ProjectReference Include="..\Akka.DistributedData\Akka.DistributedData.csproj" />
<ProjectReference Include="..\..\..\core\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// //-----------------------------------------------------------------------
// // <copyright file="ReplicatedDataSerializerBackCompatSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.DistributedData.Serialization;
using Akka.DistributedData.Serialization.Proto.Msg;
using FluentAssertions;
using Xunit;
using UniqueAddress = Akka.Cluster.UniqueAddress;
using static FluentAssertions.FluentActions;

namespace Akka.DistributedData.Tests.Serialization;

public class ReplicatedDataSerializerBackCompatSpec
{
private readonly ReplicatedDataSerializer _serializer;
private readonly UniqueAddress _address;

public ReplicatedDataSerializerBackCompatSpec()
{
var sys = ActorSystem.Create("test", @"
akka.actor.provider = cluster
akka.cluster.sharding.distributed-data.backward-compatible-wire-format = true");
_serializer = new ReplicatedDataSerializer((ExtendedActorSystem)sys);
_address = Cluster.Cluster.Get(sys).SelfUniqueAddress;
sys.Terminate();
}

[Fact(DisplayName = "DData replicated data serializer should serialize and deserialize correct backward compatible proto message")]
public void SerializeTest()
{
var lwwReg = new LWWRegister<ShardCoordinator.CoordinatorState>(_address, ShardCoordinator.CoordinatorState.Empty);
var bytes = _serializer.ToBinary(lwwReg);
var proto = LWWRegister.Parser.ParseFrom(bytes);

// Serialized type name should be equal to the old v1.4 coordinator state FQCN
proto.TypeInfo.TypeName.Should().Be("Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding");

// Deserializing the same message should succeed
Invoking(() => _serializer.FromBinary(bytes, _serializer.Manifest(lwwReg)))
.Should().NotThrow()
.And.Subject().Should().BeOfType<LWWRegister<ShardCoordinator.CoordinatorState>>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ public sealed class ReplicatedDataSerializer : SerializerWithStringManifest

private readonly byte[] _emptyArray = Array.Empty<byte>();

private readonly bool _backwardCompatWireFormat;

public ReplicatedDataSerializer(ExtendedActorSystem system) : base(system)
{
_ser = new SerializationSupport(system);
_backwardCompatWireFormat =
system.Settings.Config.GetBoolean("akka.cluster.sharding.distributed-data.backward-compatible-wire-format");
}


Expand Down Expand Up @@ -735,6 +739,11 @@ private Proto.Msg.LWWRegister LWWToProto<T>(ILWWRegister r)
pLww.State = _ser.OtherMessageToProto(register.Value);
pLww.Timestamp = register.Timestamp;
pLww.TypeInfo = GetTypeDescriptor(r.RegisterType);

// HACK: Really really ugly hack to make sure that v1.5 DData cluster sharding works with v1.4
if(_backwardCompatWireFormat && pLww.TypeInfo.TypeName == "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

pLww.TypeInfo.TypeName = "Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding";

return pLww;
}

Expand Down Expand Up @@ -766,9 +775,13 @@ private ILWWRegister LWWRegisterFromProto(Proto.Msg.LWWRegister proto)
}
case ValType.Other:
{
// HACK: Really really ugly hack to make sure that v1.5 DData cluster sharding works with v1.4
var typeName = proto.TypeInfo.TypeName;
if (typeName == "Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding")
typeName = "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding";

// runtime type - enter horrible dynamic serialization stuff

var setContentType = Type.GetType(proto.TypeInfo.TypeName);
var setContentType = Type.GetType(typeName);

var setType = LWWRegisterMaker.MakeGenericMethod(setContentType);
return (ILWWRegister)setType.Invoke(this, new object[] { proto });
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
[assembly: System.Runtime.InteropServices.GuidAttribute("a05c31e8-0246-46a1-b3bc-4d6fe7a9aa49")]
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
[assembly: System.Runtime.InteropServices.GuidAttribute("a05c31e8-0246-46a1-b3bc-4d6fe7a9aa49")]
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")]
Expand Down