From 0f94fd888673b9668e399736f50596c46fde7e39 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 21 Oct 2023 12:59:11 -0400 Subject: [PATCH 1/5] DData Serializer Benches --- .../Akka.Benchmarks/DData/RDDBenchTypes.cs | 16 ++ .../SerializerLwwDictionaryBenchmarks.cs | 140 ++++++++++++++++++ .../DData/SerializerORDictionaryBenchmarks.cs | 90 +++++++++++ .../DData/SerializerORSetBenchmarks.cs | 90 +++++++++++ 4 files changed, 336 insertions(+) create mode 100644 src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs create mode 100644 src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs create mode 100644 src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs create mode 100644 src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs diff --git a/src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs b/src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs new file mode 100644 index 00000000000..795c227e03f --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs @@ -0,0 +1,16 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2023 Lightbend Inc. +// // Copyright (C) 2013-2023 .NET Foundation +// // +// //----------------------------------------------------------------------- + +namespace Akka.Benchmarks.DData; + +public class RDDBenchTypes +{ + + public record struct TestKey(int i); + + public record TestVal(string v); +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs new file mode 100644 index 00000000000..002cd3b967b --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs @@ -0,0 +1,140 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2023 Lightbend Inc. +// // Copyright (C) 2013-2023 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.Configuration; +using Akka.DistributedData; +using Akka.DistributedData.Serialization; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.DData; + +[Config(typeof(MicroBenchmarkConfig))] +public class SerializerLwwDictionaryBenchmarks +{ + [Params(typeof(RDDBenchTypes.TestKey), typeof(RDDBenchTypes.TestVal))] + public Type KeyType; + + [Params(typeof(RDDBenchTypes.TestKey), typeof(RDDBenchTypes.TestVal))] + public Type ValueType; + + [Params(25)] + public int NumElements; + + [Params(10)] + public int NumNodes; + + private UniqueAddress[] _nodes; + private object _c1; + private ActorSystem sys; + private ReplicatedDataSerializer ser; + private byte[] _c1Ser; + private string _c1Manifest; + + [GlobalSetup] + public void SetupSystem() + { + typeof(SerializerLwwDictionaryBenchmarks).GetMethod( + nameof(SerializerLwwDictionaryBenchmarks.CreateItems), + BindingFlags.Instance | BindingFlags.NonPublic) + .MakeGenericMethod(new []{KeyType,ValueType}) + .Invoke(this, new object[]{}); + var conf = ConfigurationFactory.ParseString(@"akka.actor { + serializers { + akka-replicated-data = ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" + } + serialization-bindings { + ""Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData"" = akka-replicated-data + } + serialization-identifiers { + ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" = 11 + } +}"); + sys = ActorSystem.Create("rddsb", conf); + ser = (ReplicatedDataSerializer)sys.Serialization.FindSerializerForType( + typeof(IReplicatedDataSerialization)); + _c1Ser = ser.ToBinary(_c1); + _c1Manifest = ser.Manifest(_c1); + } + + private void CreateItems() + { + var newNodes = new List(NumNodes); + foreach (var i in Enumerable.Range(0, NumNodes)) + { + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + + _nodes = newNodes.ToArray(); + var newElements = new List(NumNodes); + foreach (var i in Enumerable.Range(0, NumElements)) + { + + newElements.Add(generate(i)); + } + + var _c1 = LWWDictionary> + .Empty; + int j = 0; + foreach (var node in _nodes) + { + _c1 = _c1.SetItem(node, generate(j), + newElements); + j++; + } + + this._c1 = _c1; + } + + private TValue generate(int i) + { + if (typeof(TValue) == typeof(RDDBenchTypes.TestVal)) + { + return (TValue)(object)new RDDBenchTypes.TestVal(i.ToString()); + } + else if (typeof(TValue) == typeof(RDDBenchTypes.TestKey)) + { + return (TValue)(object)new RDDBenchTypes.TestKey(i); + } + else if (typeof(TValue) == typeof(int)) + { + return (TValue)(object)i; + } + else if (typeof(TValue) == typeof(string)) + { + return (TValue)(object)i.ToString(); + } + else if (typeof(TValue) == typeof(long)) + { + return (TValue)(object)(i); + } + else + { + return (TValue)(object)(i); + } + } + + [Benchmark] + public void Serialize_LWWDict() + { + ser.ToBinary(_c1); + } + + [Benchmark] + public void Deserialize_LWWDict() + { + ser.FromBinary(_c1Ser, _c1Manifest); + } +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs new file mode 100644 index 00000000000..bf0e1008226 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs @@ -0,0 +1,90 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2023 Lightbend Inc. +// // Copyright (C) 2013-2023 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Linq; +using Akka.Actor; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.Configuration; +using Akka.DistributedData; +using Akka.DistributedData.Serialization; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.DData; + +[Config(typeof(MicroBenchmarkConfig))] +public class SerializerORDictionaryBenchmarks +{ + [Params(25)] + public int NumElements; + + [Params(10)] + public int NumNodes; + + private UniqueAddress[] _nodes; + private ORDictionary> _c1; + private ORSet _elements; + private ActorSystem sys; + private ReplicatedDataSerializer ser; + private byte[] _c1Ser; + private string _c1Manifest; + + [GlobalSetup] + public void SetupSystem() + { + var newNodes = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumNodes)){ + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + var newElements = ORSet.Empty; + foreach(var i in Enumerable.Range(0, NumElements)){ + newElements = newElements.Add(_nodes[0],new RDDBenchTypes.TestVal(i.ToString())); + } + _elements = newElements; + + _c1 = ORDictionary> + .Empty; + int j = 0; + foreach(var node in _nodes) + { + _c1 = _c1.SetItem(node, new RDDBenchTypes.TestKey(j), _elements); + j++; + } + var conf = ConfigurationFactory.ParseString(@"akka.actor { + serializers { + akka-replicated-data = ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" + } + serialization-bindings { + ""Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData"" = akka-replicated-data + } + serialization-identifiers { + ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" = 11 + } +}"); + sys = ActorSystem.Create("rddsb", conf); + ser = (ReplicatedDataSerializer)sys.Serialization.FindSerializerForType( + typeof(IReplicatedDataSerialization)); + _c1Ser = ser.ToBinary(_c1); + _c1Manifest = ser.Manifest(_c1); + } + + [Benchmark] + public void Serialize_ORDictionary() + { + ser.ToBinary(_c1); + } + + [Benchmark] + public void Deserialize_ORDictionary() + { + ser.FromBinary(_c1Ser, _c1Manifest); + } +} \ No newline at end of file diff --git a/src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs new file mode 100644 index 00000000000..8c0d645d974 --- /dev/null +++ b/src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs @@ -0,0 +1,90 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2023 Lightbend Inc. +// // Copyright (C) 2013-2023 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using Akka.Actor; +using Akka.Actor.Setup; +using Akka.Benchmarks.Configurations; +using Akka.Cluster; +using Akka.Configuration; +using Akka.DistributedData; +using Akka.DistributedData.Serialization; +using Akka.Serialization; +using BenchmarkDotNet.Attributes; + +namespace Akka.Benchmarks.DData; + +[Config(typeof(MicroBenchmarkConfig))] +public class SerializerORSetBenchmarks +{ + [Params(25)] + public int NumElements; + + [Params(10)] + public int NumNodes; + + private ActorSystem sys; + private ReplicatedDataSerializer ser; + private UniqueAddress[] _nodes; + private RDDBenchTypes.TestVal[] _elements; + private ORSet> _c1; + private byte[] _c1Ser; + private string _c1Manifest; + + [GlobalSetup] + public void SetupSystem() + { + var newNodes = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumNodes)){ + var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i); + var uniqueAddress = new UniqueAddress(address, i); + newNodes.Add(uniqueAddress); + } + _nodes = newNodes.ToArray(); + var newElements = new List(NumNodes); + foreach(var i in Enumerable.Range(0, NumElements)){ + newElements.Add(new RDDBenchTypes.TestVal(i.ToString())); + } + _elements = newElements.ToArray(); + + _c1 = ORSet>.Empty; + foreach(var node in _nodes){ + _c1 = _c1.Add(node, _elements.ToList()); + } + + var conf = ConfigurationFactory.ParseString(@"akka.actor { + serializers { + akka-replicated-data = ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" + } + serialization-bindings { + ""Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData"" = akka-replicated-data + } + serialization-identifiers { + ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" = 11 + } +}"); + sys = ActorSystem.Create("rddsb", conf); + ser = (ReplicatedDataSerializer)sys.Serialization.FindSerializerForType( + typeof(IReplicatedDataSerialization)); + _c1Ser = ser.ToBinary(_c1); + _c1Manifest = ser.Manifest(_c1); + } + [Benchmark] + public void Serialize_ORSet() + { + ser.ToBinary(_c1); + } + + [Benchmark] + public void Deserialize_ORSet() + { + ser.FromBinary(_c1Ser, _c1Manifest); + } + +} \ No newline at end of file From 4604b80226f6b3e90fff331326f04bd10d6e2c5c Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 28 Oct 2023 14:34:46 -0400 Subject: [PATCH 2/5] Use ArrayPoolBufferWriter for GZip --- .../Akka.Cluster.Tools.csproj | 1 - .../Akka.DistributedData.csproj | 1 + .../Serialization/SerializationSupport.cs | 41 +++++++++++++------ .../Serialization/WrappedPayloadSupport.cs | 2 +- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj index c76abbeb524..6211f1f9b03 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj @@ -13,5 +13,4 @@ - diff --git a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj index c4f04f30b21..a08ae48fffa 100644 --- a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj +++ b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj @@ -14,6 +14,7 @@ + diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs index 0ecc205b44b..0ba36dd85c6 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs @@ -6,6 +6,8 @@ //----------------------------------------------------------------------- using System; +using System.Buffers; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.IO.Compression; @@ -14,6 +16,8 @@ using Akka.Actor; using Akka.DistributedData.Serialization.Proto.Msg; using Akka.Serialization; +using CommunityToolkit.HighPerformance; +using CommunityToolkit.HighPerformance.Buffers; using Google.Protobuf; using Address = Akka.Actor.Address; using MemoryStream = System.IO.MemoryStream; @@ -83,40 +87,45 @@ public Information TransportInfo public static byte[] Compress(IMessage msg) { - using (var memStream = new MemoryStream(BufferSize)) + using (var compressedBuffer = new ArrayPoolBufferWriter(1024)) { - using (var gzip = new GZipStream(memStream, CompressionMode.Compress)) + using (var memStream = compressedBuffer.AsStream()) { - msg.WriteTo(gzip); + using (var gzip = new GZipStream(memStream, + CompressionMode.Compress)) + { + msg.WriteTo(gzip); + } } - return memStream.ToArray(); + return compressedBuffer.WrittenMemory.ToArray(); } } - + public static byte[] Decompress(byte[] input) { - using (var memStream = new MemoryStream()) + using (var decompressedBufferWriter = new ArrayPoolBufferWriter(4096)) { using(var inputStr = new MemoryStream(input)) - using (var gzipStream = new GZipStream(inputStr, CompressionMode.Decompress)) + using (var gzipStream = + new GZipStream(inputStr, CompressionMode.Decompress)) { - var buf = new byte[BufferSize]; while (gzipStream.CanRead) { - var read = gzipStream.Read(buf, 0, BufferSize); + var read = gzipStream.Read(decompressedBufferWriter.GetSpan(4096)); if (read > 0) { - memStream.Write(buf, 0, read); + decompressedBufferWriter.Advance(read); } else { break; } } + } - return memStream.ToArray(); + return decompressedBufferWriter.WrittenMemory.ToArray(); } } @@ -187,6 +196,8 @@ public IActorRef ResolveActorRef(string path) return System.Provider.ResolveActorRef(path); } + private static readonly ConcurrentDictionary + _manifestBsCache = new(); public Proto.Msg.OtherMessage OtherMessageToProto(object msg) { Proto.Msg.OtherMessage BuildOther() @@ -194,11 +205,15 @@ Proto.Msg.OtherMessage BuildOther() var m = new OtherMessage(); var msgSerializer = Serialization.FindSerializerFor(msg); m.SerializerId = msgSerializer.Identifier; - m.EnclosedMessage = ByteString.CopyFrom(msgSerializer.ToBinary(msg)); + + m.EnclosedMessage = + UnsafeByteOperations.UnsafeWrap( + msgSerializer.ToBinary(msg)); //ByteString.CopyFrom(msgSerializer.ToBinary(msg)); var ms = Akka.Serialization.Serialization.ManifestFor(msgSerializer, msg); if (!string.IsNullOrEmpty(ms)) - m.MessageManifest = ByteString.CopyFromUtf8(ms); + m.MessageManifest = _manifestBsCache.GetOrAdd(ms, + static k => ByteString.CopyFromUtf8(k)); return m; } diff --git a/src/core/Akka.Remote/Serialization/WrappedPayloadSupport.cs b/src/core/Akka.Remote/Serialization/WrappedPayloadSupport.cs index 664de590a21..8ecf9f85e49 100644 --- a/src/core/Akka.Remote/Serialization/WrappedPayloadSupport.cs +++ b/src/core/Akka.Remote/Serialization/WrappedPayloadSupport.cs @@ -29,7 +29,7 @@ public Proto.Msg.Payload PayloadToProto(object payload) var payloadProto = new Proto.Msg.Payload(); var serializer = _system.Serialization.FindSerializerFor(payload); - payloadProto.Message = ByteString.CopyFrom(serializer.ToBinary(payload)); + payloadProto.Message = UnsafeByteOperations.UnsafeWrap(serializer.ToBinary(payload)); payloadProto.SerializerId = serializer.Identifier; // get manifest From d96bacd53e215b0590d1e249ef43c2410ffb6379 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 2 Dec 2023 20:43:48 -0500 Subject: [PATCH 3/5] Use MemoryOwner --- .../Serialization/ReplicatedDataSerializer.cs | 55 ++++++++++++------- .../ReplicatorMessageSerializer.cs | 11 +++- .../Serialization/SerializationSupport.cs | 29 ++++++++-- 3 files changed, 68 insertions(+), 27 deletions(-) diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs index 7fcb32f8ff5..51a86b09e99 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs @@ -10,6 +10,7 @@ using Akka.Serialization; using Google.Protobuf; using System; +using System.Buffers; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; @@ -112,7 +113,7 @@ public override object FromBinary(byte[] bytes, string manifest) { switch (manifest) { - case ORSetManifest: return ORSetFromBinary(SerializationSupport.Decompress(bytes)); + case ORSetManifest: return ORSetFromBinary(SerializationSupport.DecompressWithRentedPool(bytes)); case ORSetAddManifest: return ORAddDeltaOperationFromBinary(bytes); case ORSetRemoveManifest: return ORRemoveOperationFromBinary(bytes); case GSetManifest: return GSetFromBinary(bytes); @@ -120,18 +121,18 @@ public override object FromBinary(byte[] bytes, string manifest) case PNCounterManifest: return PNCounterFromBytes(bytes); case FlagManifest: return FlagFromBinary(bytes); case LWWRegisterManifest: return LWWRegisterFromBinary(bytes); - case ORMapManifest: return ORDictionaryFromBinary(SerializationSupport.Decompress(bytes)); + case ORMapManifest: return ORDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes)); case ORMapPutManifest: return ORDictionaryPutFromBinary(bytes); case ORMapRemoveManifest: return ORDictionaryRemoveFromBinary(bytes); case ORMapRemoveKeyManifest: return ORDictionaryRemoveKeyFromBinary(bytes); case ORMapUpdateManifest: return ORDictionaryUpdateFromBinary(bytes); case ORMapDeltaGroupManifest: return ORDictionaryDeltaGroupFromBinary(bytes); - case LWWMapManifest: return LWWDictionaryFromBinary(SerializationSupport.Decompress(bytes)); + case LWWMapManifest: return LWWDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes)); case LWWMapDeltaGroupManifest: return LWWDictionaryDeltaGroupFromBinary(bytes); - case PNCounterMapManifest: return PNCounterDictionaryFromBinary(SerializationSupport.Decompress(bytes)); + case PNCounterMapManifest: return PNCounterDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes)); case PNCounterMapDeltaOperationManifest: return PNCounterDeltaFromBinary(bytes); - case ORMultiMapManifest: return ORMultiDictionaryFromBinary(SerializationSupport.Decompress(bytes)); + case ORMultiMapManifest: return ORMultiDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes)); case ORMultiMapDeltaOperationManifest: return ORMultiDictionaryDeltaFromBinary(bytes); case DeletedDataManifest: return DeletedData.Instance; case VersionVectorManifest: return _ser.VersionVectorFromBinary(bytes); @@ -255,9 +256,13 @@ private static Type GetTypeFromDescriptor(TypeDescriptor t) } #region ORSet - private IORSet ORSetFromBinary(byte[] bytes) + private IORSet ORSetFromBinary(IMemoryOwner bytes) { - return FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); + using (bytes) + { + var p = Proto.Msg.ORSet.Parser.ParseFrom(bytes.Memory.Span); + return FromProto(p); + } } private Proto.Msg.ORSet ToProto(IORSet orset) @@ -856,10 +861,13 @@ private void ToORMapEntries(IImmutableDictionary orm private static readonly MethodInfo ORDictMaker = typeof(ReplicatedDataSerializer).GetMethod(nameof(GenericORDictionaryFromProto), BindingFlags.Instance | BindingFlags.NonPublic); - private IORDictionary ORDictionaryFromBinary(byte[] bytes) + private IORDictionary ORDictionaryFromBinary(IMemoryOwner bytes) { - var proto = Proto.Msg.ORMap.Parser.ParseFrom(bytes); - return ORDictionaryFromProto(proto); + using (bytes) + { + var proto = Proto.Msg.ORMap.Parser.ParseFrom(bytes.Memory.Span); + return ORDictionaryFromProto(proto); + } } private IORDictionary ORDictionaryFromProto(Proto.Msg.ORMap proto) @@ -1252,10 +1260,13 @@ private ILWWDictionary GenericLWWDictFromProto(Proto.Msg.LWWMap pr } } - private ILWWDictionary LWWDictionaryFromBinary(byte[] bytes) + private ILWWDictionary LWWDictionaryFromBinary(IMemoryOwner bytes) { - var proto = Proto.Msg.LWWMap.Parser.ParseFrom(bytes); - return LWWDictFromProto(proto); + using (bytes) + { + var proto = Proto.Msg.LWWMap.Parser.ParseFrom(bytes.Memory.Span); + return LWWDictFromProto(proto); + } } @@ -1334,10 +1345,13 @@ private void ToPNCounterEntries(IImmutableDictionary unde proto.Entries.Add(entries); } - private IPNCounterDictionary PNCounterDictionaryFromBinary(byte[] bytes) + private IPNCounterDictionary PNCounterDictionaryFromBinary(IMemoryOwner bytes) { - var proto = Proto.Msg.PNCounterMap.Parser.ParseFrom(bytes); - return PNCounterDictionaryFromProto(proto); + using (bytes) + { + var proto = Proto.Msg.PNCounterMap.Parser.ParseFrom(bytes.Memory.Span); + return PNCounterDictionaryFromProto(proto); + } } private IPNCounterDictionary PNCounterDictionaryFromProto(Proto.Msg.PNCounterMap proto) @@ -1468,10 +1482,13 @@ private void ToORMultiMapEntries(IImmutableDictionary bytes) { - var ormm = Proto.Msg.ORMultiMap.Parser.ParseFrom(bytes); - return ORMultiDictionaryFromProto(ormm); + using (bytes) + { + var ormm = Proto.Msg.ORMultiMap.Parser.ParseFrom(bytes.Memory.Span); + return ORMultiDictionaryFromProto(ormm); + } } private IORMultiValueDictionary ORMultiDictionaryFromProto(ORMultiMap proto) diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatorMessageSerializer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatorMessageSerializer.cs index c3453e8fee8..0df21595b82 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatorMessageSerializer.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatorMessageSerializer.cs @@ -9,6 +9,7 @@ using Akka.Serialization; using Akka.Util.Internal; using System; +using System.Buffers; using System.Collections.Generic; using System.Collections.Immutable; using Google.Protobuf; @@ -541,7 +542,7 @@ public override object FromBinary(byte[] bytes, string manifest) case GetFailureManifest: return GetFailureFromBinary(bytes); case SubscribeManifest: return SubscribeFromBinary(bytes); case UnsubscribeManifest: return UnsubscribeFromBinary(bytes); - case GossipManifest: return GossipFromBinary(SerializationSupport.Decompress(bytes)); + case GossipManifest: return GossipFromBinary(SerializationSupport.DecompressWithRentedPool(bytes)); case WriteNackManifest: return WriteNack.Instance; case DeltaNackManifest: return DeltaNack.Instance; @@ -549,9 +550,13 @@ public override object FromBinary(byte[] bytes, string manifest) } } - private Gossip GossipFromBinary(byte[] bytes) + private Gossip GossipFromBinary(IMemoryOwner bytes) { - var proto = Proto.Msg.Gossip.Parser.ParseFrom(bytes); + Proto.Msg.Gossip proto = null; + using (bytes) + { + proto = Proto.Msg.Gossip.Parser.ParseFrom(bytes.Memory.Span); + } var builder = ImmutableDictionary.Empty.ToBuilder(); foreach (var entry in proto.Entries) { diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs index 0ba36dd85c6..9abd6375609 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs @@ -104,15 +104,29 @@ public static byte[] Compress(IMessage msg) public static byte[] Decompress(byte[] input) { - using (var decompressedBufferWriter = new ArrayPoolBufferWriter(4096)) + using (var buf = DecompressWithRentedPool(input)) { - using(var inputStr = new MemoryStream(input)) + return buf.Memory.ToArray(); + } + } + + public static IMemoryOwner DecompressWithRentedPool(ReadOnlyMemory input) + { + ArrayPoolBufferWriter decompressedBufferWriter = null; + bool failed = true; + try + { + decompressedBufferWriter = + new ArrayPoolBufferWriter(4096); + using (var inputStr = input.AsStream()) using (var gzipStream = new GZipStream(inputStr, CompressionMode.Decompress)) { while (gzipStream.CanRead) { - var read = gzipStream.Read(decompressedBufferWriter.GetSpan(4096)); + var read = + gzipStream.Read( + decompressedBufferWriter.GetSpan(4096)); if (read > 0) { decompressedBufferWriter.Advance(read); @@ -122,10 +136,15 @@ public static byte[] Decompress(byte[] input) break; } } - } - return decompressedBufferWriter.WrittenMemory.ToArray(); + failed = false; + return decompressedBufferWriter; + } + finally + { + if (failed) + decompressedBufferWriter?.Dispose(); } } From 31819857a9c5ad838fa142e9ae8a4a00a55f9bba Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 2 Dec 2023 22:18:06 -0500 Subject: [PATCH 4/5] Use NonBlockingConcurrentDictionary --- src/benchmark/Akka.Benchmarks/Program.cs | 12 ++++++ .../Akka.DistributedData.csproj | 1 + .../Serialization/SerializationSupport.cs | 39 ++++++++++--------- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/benchmark/Akka.Benchmarks/Program.cs b/src/benchmark/Akka.Benchmarks/Program.cs index 83c31a46621..2619adfadca 100644 --- a/src/benchmark/Akka.Benchmarks/Program.cs +++ b/src/benchmark/Akka.Benchmarks/Program.cs @@ -5,7 +5,10 @@ // //----------------------------------------------------------------------- +using System; using System.Reflection; +using System.Threading; +using Akka.Benchmarks.DData; using BenchmarkDotNet.Running; namespace Akka.Benchmarks @@ -14,6 +17,15 @@ class Program { static void Main(string[] args) { + //var w = new SerializerORDictionaryBenchmarks(); + //w.NumElements = 25; + //w.NumNodes = 30; + //w.SetupSystem(); + //Console.WriteLine("Running"); + //while (true) + //{ + // w.Serialize_ORDictionary(); + //} BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args); } } diff --git a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj index a08ae48fffa..02ab6edc9ae 100644 --- a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj +++ b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj @@ -16,6 +16,7 @@ + diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs index 9abd6375609..c5866f5925b 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs @@ -215,27 +215,10 @@ public IActorRef ResolveActorRef(string path) return System.Provider.ResolveActorRef(path); } - private static readonly ConcurrentDictionary + private static readonly NonBlocking.ConcurrentDictionary _manifestBsCache = new(); public Proto.Msg.OtherMessage OtherMessageToProto(object msg) { - Proto.Msg.OtherMessage BuildOther() - { - var m = new OtherMessage(); - var msgSerializer = Serialization.FindSerializerFor(msg); - m.SerializerId = msgSerializer.Identifier; - - m.EnclosedMessage = - UnsafeByteOperations.UnsafeWrap( - msgSerializer.ToBinary(msg)); //ByteString.CopyFrom(msgSerializer.ToBinary(msg)); - - var ms = Akka.Serialization.Serialization.ManifestFor(msgSerializer, msg); - if (!string.IsNullOrEmpty(ms)) - m.MessageManifest = _manifestBsCache.GetOrAdd(ms, - static k => ByteString.CopyFromUtf8(k)); - return m; - } - // Serialize actor references with full address information (defaultAddress). // When sending remote messages currentTransportInformation is already set, // but when serializing for digests or DurableStore it must be set here. @@ -245,7 +228,7 @@ Proto.Msg.OtherMessage BuildOther() if (oldInfo == null) Akka.Serialization.Serialization.CurrentTransportInformation = System.Provider.SerializationInformation; - return BuildOther(); + return BuildOtherImpl(msg,Serialization); } finally { @@ -253,6 +236,24 @@ Proto.Msg.OtherMessage BuildOther() } } + private static OtherMessage BuildOtherImpl(object msg, + Akka.Serialization.Serialization serialization) + { + var m = new OtherMessage(); + var msgSerializer = serialization.FindSerializerFor(msg); + m.SerializerId = msgSerializer.Identifier; + + m.EnclosedMessage = + UnsafeByteOperations.UnsafeWrap( + msgSerializer.ToBinary(msg)); //ByteString.CopyFrom(msgSerializer.ToBinary(msg)); + + var ms = Akka.Serialization.Serialization.ManifestFor(msgSerializer, msg); + if (!string.IsNullOrEmpty(ms)) + m.MessageManifest = _manifestBsCache.GetOrAdd(ms, + static k => ByteString.CopyFromUtf8(k)); + return m; + } + public object OtherMessageFromBytes(byte[] other) { return OtherMessageFromProto(OtherMessage.Parser.ParseFrom(other)); From fbd3f075edb57dd6da86a6512d8963e86e8480ff Mon Sep 17 00:00:00 2001 From: Andrew El Date: Sat, 21 Oct 2023 12:59:11 -0400 Subject: [PATCH 5/5] Fixups --- .../Akka.Benchmarks/DData/ORSetBenchmarks.cs | 2 + .../Akka.Cluster.Tools.csproj | 5 + .../Serialization/OtherMessageComparer.cs | 46 +- .../ReplicatedDataSerializer.GenericCache.cs | 822 +++++++++++++++ .../Serialization/ReplicatedDataSerializer.cs | 937 +++++++++++------- .../Serialization/SerializationSupport.cs | 19 + src/core/Akka/Actor/ActorRef.cs | 1 + src/core/Akka/Actor/Futures.cs | 1 + 8 files changed, 1488 insertions(+), 345 deletions(-) create mode 100644 src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs diff --git a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs index 53fceed79b9..73a85714bde 100644 --- a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs @@ -13,7 +13,9 @@ using Akka.Benchmarks.Configurations; using Akka.Cluster; using Akka.DistributedData; +using Akka.DistributedData.Serialization; using BenchmarkDotNet.Attributes; +using static Akka.Benchmarks.DData.RDDBenchTypes; namespace Akka.Benchmarks.DData { diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj index 6211f1f9b03..fb71544fa38 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj @@ -13,4 +13,9 @@ + + + + + diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs index 048f71c7229..74abe9ed150 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs @@ -12,6 +12,15 @@ namespace Akka.DistributedData.Serialization { + internal class OtherMessageAndVersionComparer : IComparer< + ValueTuple> + { + public static OtherMessageAndVersionComparer Instance { get; } = new(); + public int Compare(ValueTuple x, ValueTuple y) + { + return OtherMessageComparer.Instance.Compare(x.Item1, y.Item1); + } + } internal class OtherMessageComparer : IComparer { public static OtherMessageComparer Instance { get; } = new(); @@ -32,15 +41,36 @@ public int Compare(OtherMessage a, OtherMessage b) if (aSize < bSize) return -1; if (aSize > bSize) return 1; - for (var i = 0; i < aSize; i++) - { - var aByte = aByteString[i]; - var bByte = bByteString[i]; - if (aByte < bByte) return -1; - if (aByte > bByte) return 1; - } + //int j = 0; + return aByteString.SequenceCompareTo(bByteString); + //while (j + 4 < aSize) + //{ + // if (aByteString.Slice(j, 4) + // .SequenceEqual(bByteString.Slice(j, 4)) == false) + // { + // break; + // } + // else + // { + // j = j + 4; + // } + //} + //for (; j < aSize; j++) + //{ + // var aByte = aByteString[j]; + // var bByte = bByteString[j]; + // if (aByte < bByte) return -1; + // if (aByte > bByte) return 1; + //} + //for (var i = 0; i < aSize; i++) + //{ + // var aByte = aByteString[i]; + // var bByte = bByteString[i]; + // if (aByte < bByte) return -1; + // if (aByte > bByte) return 1; + //} - return 0; + //return 0; } } } diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs new file mode 100644 index 00000000000..a0411cb2502 --- /dev/null +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs @@ -0,0 +1,822 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2023 Lightbend Inc. +// // Copyright (C) 2013-2023 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq.Expressions; +using System.Reflection; +using System.Threading; +using Akka.DistributedData.Serialization.Proto.Msg; +using Google.Protobuf.Collections; + +namespace Akka.DistributedData.Serialization; + +public sealed partial class ReplicatedDataSerializer +{ + private sealed class LazyPool + { + private readonly ConcurrentQueue _poolItems = new(); + private int _numItems = new(); + private readonly int _maxItems; + public LazyPool(int maxItems) + { + _maxItems = maxItems; + } + + public bool TryGetValue(out T value) + { + if (_numItems > 0) + { + if (Interlocked.Decrement(ref _numItems) < 0) + { + Interlocked.Increment(ref _numItems); + } + else + { + return _poolItems.TryDequeue(out value); + } + } + value = default; + return false; + } + + public bool TryReturn(T value) + { + if (_numItems < _maxItems) + { + if (Interlocked.Increment(ref _numItems) < (_maxItems * 2)) + { + _poolItems.Enqueue(value); + return true; + } + else + { + Interlocked.Decrement(ref _numItems); + } + } + + return false; + } + } + private static class SerDeserGenericCache + { + private readonly struct TwoTypeLookup : IEquatable + { + public readonly Type Item1; + public readonly Type Item2; + + public TwoTypeLookup(Type item1, Type item2) + { + Item1 = item1; + Item2 = item2; + } + + public override bool Equals(object obj) + { + return obj is TwoTypeLookup other && Equals(other); + } + + public bool Equals(TwoTypeLookup other) + { + return Item1 == other.Item1 && Item2 == other.Item2; + } + + public override int GetHashCode() + { + unchecked + { + return (Item1.GetHashCode() * 397) ^ Item2.GetHashCode(); + } + } + + public static bool operator ==(TwoTypeLookup left, TwoTypeLookup right) + { + return left.Equals(right); + } + + public static bool operator !=(TwoTypeLookup left, TwoTypeLookup right) + { + return !left.Equals(right); + } + } + private static readonly ConcurrentDictionary> + toGenericOrSetCache = new(); + + public static IORSet ToGenericORSet(Type k, + ReplicatedDataSerializer serializer, + Proto.Msg.ORSet set, + VersionVector versionVectors) + { + if (toGenericOrSetCache.TryGetValue(k, + out var value) == + false) + { + value = toGenericOrSetCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericTwoArgs< + Proto.Msg.ORSet, VersionVector, IORSet>( + ORSetMaker, kc,true) + .Compile()); + } + + return value(serializer, set, versionVectors); + } + + private static readonly ConcurrentDictionary> + orSetUnknownToProto = new(); + + public static Proto.Msg.ORSet ORSetUnknownToProto(Type k, + ReplicatedDataSerializer serializer, + IORSet set, Proto.Msg.ORSet b) + { + + var value = orSetUnknownToProto.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericTwoArgs< + IORSet, Proto.Msg.ORSet, Proto.Msg.ORSet>( + ORSetUnknownMaker, kc) + .Compile()); + return value(serializer, set, b); + } + + private static readonly ConcurrentDictionary, + ORSet.IDeltaGroupOperation>> + orDeltaGroupCtorCache = new(); + + public static ORSet.IDeltaGroupOperation CreateDeltaGroup(Type t, + ImmutableArray arr) + { + if (orDeltaGroupCtorCache.TryGetValue(t, out var value) == false) + { + value = orDeltaGroupCtorCache.GetOrAdd(t, static ct => + { + return MakeConstructorExprForTypeWith1GenericsOneString< + ImmutableArray, + ORSet.IDeltaGroupOperation>( + typeof(ORSet<>.DeltaGroup), ct).Compile(); + }); + } + + return value(arr); + } + + private static readonly ConcurrentDictionary> + gSetToProtoCache = new(); + + public static Proto.Msg.GSet GSetToProto(Type k, + ReplicatedDataSerializer serializer, + IGSet operation) + { + var value = gSetToProtoCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericOneArg< + IGSet, Proto.Msg.GSet>( + GSetUnknownToProtoMaker, kc) + .Compile()); + + return value(serializer, operation); + } + + private static readonly ConcurrentDictionary, IGSet>> + toGenericGSetCache = new(); + + public static IGSet ToGenericGSet(Type k, + ReplicatedDataSerializer serializer, + RepeatedField operation) + { + var value = toGenericGSetCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericOneArg, IGSet>( + GSetMaker, kc, true) + .Compile()); + + return value(serializer,operation); + } + + private static readonly ConcurrentDictionary> + genericLwwRegisterToProtoCache = new(); + + public static Proto.Msg.LWWRegister GenericLwwRegisterToProto(Type k, + ReplicatedDataSerializer serializer, + ILWWRegister operation) + { + var value = genericLwwRegisterToProtoCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericOneArg< + ILWWRegister, Proto.Msg.LWWRegister>( + LWWProtoMaker, kc) + .Compile()); + + return value(serializer, operation); + } + + private static readonly ConcurrentDictionary> + genericLwwRegisterFromProtoCache = new(); + + public static ILWWRegister GenericLwwRegisterFromProto(Type k, + ReplicatedDataSerializer serializer, + Proto.Msg.LWWRegister operation) + { + var value = genericLwwRegisterFromProtoCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericOneArg< + Proto.Msg.LWWRegister, ILWWRegister>( + LWWRegisterMaker, kc, true) + .Compile()); + + + return value(serializer, operation); + } + + private static readonly ConcurrentDictionary> + orDictionaryToProtoCache = new(); + + public static Proto.Msg.ORMap ORDictionaryToProto(Type k, Type v, + ReplicatedDataSerializer serializer, + IORDictionary operation) + { + + var value = orDictionaryToProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArg< + IORDictionary, Proto.Msg.ORMap>( + ORDictProtoMaker, kc.Item1, kc.Item2) + .Compile()); + + + return value(serializer, operation); + } + + private static readonly ConcurrentDictionary> + orDictionaryFromProtoCache = new(); + + public static IORDictionary ORDictionaryFromProto(Type k, Type v, + ReplicatedDataSerializer serializer, + Proto.Msg.ORMap operation) + { + var value = orDictionaryFromProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArg< + Proto.Msg.ORMap, IORDictionary>( + ORDictMaker, kc.Item1, kc.Item2) + .Compile()); + + + return value(serializer, operation); + } + + private static readonly ConcurrentDictionary, + Proto.Msg.ORMapDeltaGroup>> + orDictionaryDeltasToProtoCache = new(); + + private static readonly ConcurrentDictionary> + orDictionaryDeltaGroupFromProtoCache = new(); + + public static Proto.Msg.ORMapDeltaGroup ORDictionaryDeltasToProto( + Type k, Type v, + ReplicatedDataSerializer serializer, + List operation) + { + var value = orDictionaryDeltasToProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArg< + List, + Proto.Msg.ORMapDeltaGroup>( + ORDeltaGroupProtoMaker, kc.Item1, kc.Item2) + .Compile()); + + return value(serializer, operation); + } + + public static ORDictionary.IDeltaGroupOp + ORDictionaryDeltaGroupFromProto(Type k, Type v, + ReplicatedDataSerializer serializer, + Proto.Msg.ORMapDeltaGroup operation) + { + var value = orDictionaryDeltaGroupFromProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArg< + Proto.Msg.ORMapDeltaGroup, + ORDictionary.IDeltaGroupOp>( + ORDeltaGroupMaker, kc.Item1, kc.Item2) + .Compile()); + + return value(serializer, operation); + } + + private static readonly ConcurrentDictionary> + lwwDictionaryToProtoCache = new(); + + private static readonly ConcurrentDictionary> + lwwDictionaryFromProtoCache = new(); + + private static readonly ConcurrentDictionary> + lwwDictionaryDeltaFromProtoCache = new(); + + + public static Proto.Msg.LWWMap + LWWDictionaryToProto(Type k, Type v, + ReplicatedDataSerializer serializer, + ILWWDictionary operation) + { + var value = lwwDictionaryToProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArgOneConst< + ILWWDictionary, Proto.Msg.TypeDescriptor, Proto.Msg.LWWMap>( + LWWDictProtoMaker, GetTypeDescriptor(kc.Item2), kc.Item1, kc.Item2) + .Compile()); + + return value(serializer, operation); + } + + public static ILWWDictionary + LWWDictionaryFromProto(Type k, Type v, + ReplicatedDataSerializer serializer, + Proto.Msg.LWWMap operation) + { + var value = lwwDictionaryFromProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArg< + Proto.Msg.LWWMap, + ILWWDictionary>( + LWWDictMaker, kc.Item1, kc.Item2) + .Compile()); + + return value(serializer, operation); + } + + public static ILWWDictionaryDeltaOperation + LWWDictionaryDeltaFromProto(Type k, Type v, + ReplicatedDataSerializer serializer, + ORDictionary.IDeltaOperation operation) + { + var value = lwwDictionaryDeltaFromProtoCache.GetOrAdd(new(k, v), + static kc => + MakeInstanceCallTwoGenericsOneArg< + ORDictionary.IDeltaOperation, + ILWWDictionaryDeltaOperation>( + LWWDictionaryDeltaMaker, kc.Item1, kc.Item2) + .Compile()); + + + return value(serializer, operation); + } + + + private static readonly ConcurrentDictionary> + pnToProtoCache = new(); + + private static readonly ConcurrentDictionary> + pnDictionaryDeltaCache = new(); + + private static readonly ConcurrentDictionary> + pnDictionaryFromProtoCache = new(); + + public static IPNCounterDictionary + PNCounterDictionaryFromProto(Type k, + ReplicatedDataSerializer serializer, + PNCounterMap op) + { + var value = pnDictionaryFromProtoCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericOneArg< + PNCounterMap, + IPNCounterDictionary>( + PNCounterDictMaker, kc).Compile()); + + + return value(serializer, op); + } + + private static readonly ConcurrentDictionary> + pnCounterDictionaryFromProtoCache = new(); + + public static IPNCounterDictionaryDeltaOperation + PnCounterDictionaryDeltaFromProto(Type k, + ReplicatedDataSerializer serializer, + ORDictionary.IDeltaOperation op) + { + var value = pnDictionaryDeltaCache.GetOrAdd(k, + static kc => + MakeInstanceCallOneGenericOneArg< + ORDictionary.IDeltaOperation, + IPNCounterDictionaryDeltaOperation>( + PNCounterDeltaMaker, kc).Compile()); + + return value(serializer, op); + } + + public static PNCounterMap PNCounterDictionaryToProto(Type k, + ReplicatedDataSerializer serializer, IPNCounterDictionary dict) + { + var value = pnToProtoCache.GetOrAdd(k, static kc => + MakeInstanceCallOneGenericOneArg(PNCounterDictProtoMaker, kc) + .Compile()); + + return value(serializer, dict); + } + + private static readonly + ConcurrentDictionary> + orMultiValueDictionaryKeyCache = new(); + + private static readonly + ConcurrentDictionary> + orMultiValueSerializerCache = new(); + + private static readonly + ConcurrentDictionary> + orMultiValueDeserializerCache = new(); + + private static readonly + ConcurrentDictionary> + orMultiValueDeltaOpCache = new(); + + private static readonly + ConcurrentDictionary> + orDictionaryKeyCache = new(); + + private static readonly + ConcurrentDictionary> + lwwDictionaryKeyCache = new(); + + private static readonly + ConcurrentDictionary> + orSetKeyCache = new(); + + private static readonly + ConcurrentDictionary> + pnCounterDictionaryKeyCache = new(); + + private static readonly + ConcurrentDictionary> + lwwRegisterKeyCache = new(); + + private static readonly + ConcurrentDictionary> + gsetKeyCache = new(); + + public static Proto.Msg.ORMultiMap + IORMultiValueDictionaryToProtoORMultiMap(Type tk, Type tv, + ReplicatedDataSerializer ser, + IORMultiValueDictionary dict) + { + + var value = orMultiValueSerializerCache.GetOrAdd(new(tk, tv), + MakeInstanceCallTwoGenericsOneArg< + IORMultiValueDictionary, Proto.Msg.ORMultiMap>( + MultiMapProtoMaker, tk, + tv) + .Compile()); + + return value(ser, dict); + } + + public static IORMultiValueDictionary + ProtoORMultiMapToIORMultiValueDictionary(Type tk, Type tv, + ReplicatedDataSerializer ser, + Proto.Msg.ORMultiMap dict) + { + var value = orMultiValueDeserializerCache.GetOrAdd(new(tk, tv), + MakeInstanceCallTwoGenericsOneArg< + Proto.Msg.ORMultiMap, IORMultiValueDictionary>( + MultiDictMaker, tk, + tv) + .Compile()); + + return value(ser, dict); + } + + + public static IORMultiValueDictionaryDeltaOperation + ToOrMultiDictionaryDelta(Type tk, Type tv, + ReplicatedDataSerializer ser, + ORDictionary.IDeltaOperation deltaop, bool withValueDeltas) + { + var value = orMultiValueDeltaOpCache.GetOrAdd(new(tk, tv), + MakeInstanceCallTwoGenericsTwoArgs< + ORDictionary.IDeltaOperation, bool, + IORMultiValueDictionaryDeltaOperation>( + ORMultiDictionaryDeltaMaker, tk, + tv) + .Compile()); + + return value(ser, deltaop, withValueDeltas); + } + + public static IKey GetGSetKey(Type k, string arg) + { + var value = gsetKeyCache.GetOrAdd(k, + MakeConstructorExprForTypeWith1GenericsOneString( + typeof(GSetKey<>), k).Compile()); + + return value(arg); + } + + public static IKey GetLWWRegisterKeyValue(Type k, string arg) + { + var value = lwwRegisterKeyCache.GetOrAdd(k, + MakeConstructorExprForTypeWith1GenericsOneString( + typeof(LWWRegisterKey<>), k).Compile()); + return value(arg); + } + + public static IKey GetPNCounterDictionaryKeyValue(Type k, string arg) + { + + var value = pnCounterDictionaryKeyCache.GetOrAdd(k, + MakeConstructorExprForTypeWith1GenericsOneString( + typeof(PNCounterDictionaryKey<>), k).Compile()); + + return value(arg); + } + + public static IKey GetORSetKey(Type k, string arg) + { + + var value = orSetKeyCache.GetOrAdd(k, + MakeConstructorExprForTypeWith1GenericsOneString(typeof( + ORSetKey<>), k).Compile()); + + return value(arg); + } + + public static IKey GetORDictionaryKey(Type k, Type v, + string arg) + { + var value = orDictionaryKeyCache.GetOrAdd(new(k, v), + MakeConstructorExprForTypeWith2GenericsOneString( + typeof(ORDictionaryKey<,>), k, v) + .Compile()); + + return value(arg); + } + + public static IKey GetORMultiValueDictionaryKey(Type k, Type v, + string arg) + { + var value = orMultiValueDictionaryKeyCache.GetOrAdd(new(k, v), + MakeConstructorExprForTypeWith2GenericsOneString( + typeof(ORMultiValueDictionaryKey<,>), k, v) + .Compile()); + + return value(arg); + } + + public static IKey GetLWWDictionaryKey(Type k, Type v, string arg) + { + var value = lwwDictionaryKeyCache.GetOrAdd(new(k, v), + MakeConstructorExprForTypeWith2GenericsOneString( + typeof(LWWDictionaryKey<,>), k, v).Compile()); + + return value(arg); + } + + private static + Expression> + MakeInstanceCallOneGenericOneArg( + MethodInfo nonConstructedGenericMethodInfo, Type tk, + bool convert = false) + { + var i = Expression.Parameter(typeof(ReplicatedDataSerializer), + "ser"); + var p = Expression.Parameter(typeof(TArg), + "multi"); + var c = Expression.Call(i, + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }), + new[] { p }); + return Expression + .Lambda< + Func>( + convert ? Expression.Convert(c, typeof(TRet)) : c, + false, i, p); + } + + private static + Expression> + MakeStaticCallOneGenericOneArg( + MethodInfo nonConstructedGenericMethodInfo, Type tk, + bool convert = false) + { + var p = Expression.Parameter(typeof(TArg), + "multi"); + var c = Expression.Call(nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }), + new[] { p }); + return Expression + .Lambda< + Func>( + convert ? Expression.Convert(c, typeof(TRet)) : c, + false, p); + } + + private static + Expression> + MakeStaticCallOneGenericTwoArgs( + MethodInfo nonConstructedGenericMethodInfo, Type tk, + bool convert = false) + { + var p = Expression.Parameter(typeof(TArg1), + "multi"); + var p2 = Expression.Parameter(typeof(TArg2)); + var c = Expression.Call( + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }), + new[] { p,p2 }); + return Expression + .Lambda< + Func>( + convert ? Expression.Convert(c, typeof(TRet)) : c, false, p, + p2); + } + + private static Expression> + MakeInstanceCallOneGenericOneIgnoredArg( + MethodInfo nonConstructedGenericMethodInfo, + Type tk, Type targ2Ignore, bool convert = false) + { + var p2 = Expression.Constant(null, targ2Ignore); + var i = Expression.Parameter(typeof(ReplicatedDataSerializer), + "ser"); + var p = Expression.Parameter(typeof(TArg), + "multi"); + var c = Expression.Call(i, + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }), + p, p2); + return Expression + .Lambda< + Func>( + convert ? Expression.Convert(c, typeof(TRet)) : c, false, i, + p); + + } + private static + Expression> + MakeInstanceCallOneGenericTwoArgs( + MethodInfo nonConstructedGenericMethodInfo, Type tk, + bool convert = false) + { + + var p2 = Expression.Parameter(typeof(TArg2)); + var i = Expression.Parameter(typeof(ReplicatedDataSerializer), + "ser"); + var p = Expression.Parameter(typeof(TArg1), + "multi"); + var c = Expression.Call(i, + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }), + new[] { p, p2 }); + return Expression + .Lambda< + Func>( + convert ? Expression.Convert(c, typeof(TRet)) : c, false, i, + p, p2); + } + + private static + Expression> + MakeInstanceCallTwoGenericsOneArg( + MethodInfo nonConstructedGenericMethodInfo, Type tk, Type tv) + { + var i = Expression.Parameter(typeof(ReplicatedDataSerializer), + "ser"); + var p = Expression.Parameter(typeof(TArg), + "multi"); + var c = Expression.Call(i, + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] + { + tk, tv + }), + new[] { p }); + return Expression + .Lambda< + Func>(c, false, i, p); + } + + private static + Expression> + MakeInstanceCallTwoGenericsOneArgOneConst( + MethodInfo nonConstructedGenericMethodInfo, TConst conArg, Type tk, Type tv) + { + var i = Expression.Parameter(typeof(ReplicatedDataSerializer), + "ser"); + var p = Expression.Parameter(typeof(TArg), + "multi"); + var pc = Expression.Constant(conArg, typeof(TConst)); + var c = Expression.Call(i, + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] + { + tk, tv + }), + new Expression[] { p ,pc}); + return Expression + .Lambda< + Func>(c, false, i, p); + } + + private static + Expression> + MakeInstanceCallTwoGenericsTwoArgs( + MethodInfo nonConstructedGenericMethodInfo, Type tk, Type tv) + { + var i = Expression.Parameter(typeof(ReplicatedDataSerializer), + "ser"); + var p = Expression.Parameter(typeof(TArg1), + "multi"); + var p2 = Expression.Parameter(typeof(TArg2)); + var c = Expression.Call(i, + nonConstructedGenericMethodInfo.MakeGenericMethod(new[] + { + tk, tv + }), + new[] { p, p2 }); + return Expression + .Lambda< + Func>(c, false, i, p, p2); + } + + private static Expression> + MakeConstructorExprForTypeWith1GenericsOneString(Type t, + Type k) + { + var p = Expression.Parameter(typeof(T), "arg"); + var ctor = t + .MakeGenericType(new[] { k }) + .GetConstructor(new[] { typeof(T) }); + var newExp = Expression.New(ctor, p); + var cast = Expression.Convert(newExp, typeof(TOut)); + var newVal = Expression.Lambda>(cast, p); + return newVal; + } + + private static Expression> + MakeConstructorExprForTypeWith2GenericsOneString(Type t, Type k, + Type v) + { + var p = Expression.Parameter(typeof(string), "key"); + var ctor = t + .MakeGenericType(new[] { k, v }) + .GetConstructor(new[] { typeof(string) }); + var newExp = Expression.New(ctor, p); + var cast = Expression.Convert(newExp, typeof(IKey)); + var newVal = Expression.Lambda>(cast, p); + return newVal; + } + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs index 51a86b09e99..6fe8defeee5 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs @@ -11,6 +11,7 @@ using Google.Protobuf; using System; using System.Buffers; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; @@ -19,13 +20,16 @@ using Akka.DistributedData.Serialization.Proto.Msg; using Akka.Util; using Akka.Util.Internal; +using Google.Protobuf.Collections; +using Microsoft.Extensions.ObjectPool; using ArgumentOutOfRangeException = System.ArgumentOutOfRangeException; using IActorRef = Akka.Actor.IActorRef; +using UniqueAddress = Akka.Cluster.UniqueAddress; namespace Akka.DistributedData.Serialization { - public sealed class ReplicatedDataSerializer : SerializerWithStringManifest + public sealed partial class ReplicatedDataSerializer : SerializerWithStringManifest { private const string DeletedDataManifest = "A"; @@ -81,13 +85,13 @@ public override byte[] ToBinary(object obj) { switch (obj) { - case IORSet o: return SerializationSupport.Compress(ToProto(o)); - case ORSet.IAddDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray(); - case ORSet.IRemoveDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray(); + case IORSet o: return SerializationSupport.Compress(ORSetToProto(o)); + case ORSet.IAddDeltaOperation o: return ORSetToProto(o.UnderlyingSerialization).ToByteArray(); + case ORSet.IRemoveDeltaOperation o: return ORSetToProto(o.UnderlyingSerialization).ToByteArray(); case IGSet g: return ToProto(g).ToByteArray(); case GCounter g: return ToProto(g).ToByteArray(); case PNCounter p: return ToProto(p).ToByteArray(); - case Flag f: return ToProto(f).ToByteArray(); + case Flag f: return GetFlagBytes(f);//ToProto(f).ToByteArray(); case ILWWRegister l: return ToProto(l).ToByteArray(); case IORDictionary o: return SerializationSupport.Compress(ToProto(o)); case ORDictionary.IDeltaOperation p: return ToProto(p).ToByteArray(); @@ -102,8 +106,8 @@ public override byte[] ToBinary(object obj) // key types case IKey k: return ToProto(k).ToByteArray(); // less common delta types - case ORSet.IDeltaGroupOperation o: return ToProto(o).ToByteArray(); - case ORSet.IFullStateDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray(); + case ORSet.IDeltaGroupOperation o: return ORDeltaGroupOpToProto(o).ToByteArray(); + case ORSet.IFullStateDeltaOperation o: return ORSetToProto(o.UnderlyingSerialization).ToByteArray(); default: throw new ArgumentException($"Can't serialize object of type [{obj.GetType().FullName}] in [{GetType().FullName}]"); } @@ -205,7 +209,20 @@ public override string Manifest(object o) } } + private static readonly ConcurrentDictionary + _typeDescriptorCache = new(); private static TypeDescriptor GetTypeDescriptor(Type t) + { + if (_typeDescriptorCache.TryGetValue(t, out var item) == false) + { + item = _typeDescriptorCache.GetOrAdd(t, + static k => _getTypeDescriptorImpl(k)); + } + + return item; + } + + private static TypeDescriptor _getTypeDescriptorImpl(Type t) { var typeInfo = new TypeDescriptor(); if (t == typeof(string)) @@ -233,6 +250,8 @@ private static TypeDescriptor GetTypeDescriptor(Type t) return typeInfo; } + private static readonly ConcurrentDictionary + _otherTypeCache = new(); private static Type GetTypeFromDescriptor(TypeDescriptor t) { switch (t.Type) @@ -246,10 +265,18 @@ private static Type GetTypeFromDescriptor(TypeDescriptor t) case ValType.ActorRef: return typeof(IActorRef); case ValType.Other: + { + if (_otherTypeCache.TryGetValue(t.TypeName, out var type) == + false) { - var type = Type.GetType(t.TypeName); - return type; + type= _otherTypeCache.GetOrAdd(t.TypeName, + static (k) => Type.GetType(k)); } + + return type; + //var type = Type.GetType(t.TypeName); + //return type; + } default: throw new SerializationException($"Unknown ValType of [{t.Type}] detected"); } @@ -261,23 +288,20 @@ private IORSet ORSetFromBinary(IMemoryOwner bytes) using (bytes) { var p = Proto.Msg.ORSet.Parser.ParseFrom(bytes.Memory.Span); - return FromProto(p); + return ORSetFromProto(p); } } - private Proto.Msg.ORSet ToProto(IORSet orset) + private Proto.Msg.ORSet ORSetToProto(IORSet orset) { - var b = new Proto.Msg.ORSet - { - TypeInfo = new TypeDescriptor() - }; + var b = new Proto.Msg.ORSet(); switch (orset) { case ORSet ints: - { + { + b.TypeInfo = GetTypeDescriptor(typeof(int)); b.Vvector = SerializationSupport.VersionVectorToProto(ints.VersionVector); - b.TypeInfo.Type = ValType.Int; var intElements = new List(ints.ElementsMap.Keys); intElements.Sort(); foreach (var val in intElements) @@ -289,8 +313,9 @@ private Proto.Msg.ORSet ToProto(IORSet orset) } case ORSet longs: { + + b.TypeInfo = GetTypeDescriptor(typeof(long)); b.Vvector = SerializationSupport.VersionVectorToProto(longs.VersionVector); - b.TypeInfo.Type = ValType.Long; var longElements = new List(longs.ElementsMap.Keys); longElements.Sort(); foreach (var val in longElements) @@ -302,8 +327,9 @@ private Proto.Msg.ORSet ToProto(IORSet orset) } case ORSet strings: { + + b.TypeInfo = GetTypeDescriptor(typeof(string)); b.Vvector = SerializationSupport.VersionVectorToProto(strings.VersionVector); - b.TypeInfo.Type = ValType.String; var stringElements = new List(strings.ElementsMap.Keys); stringElements.Sort(); foreach (var val in stringElements) @@ -315,8 +341,8 @@ private Proto.Msg.ORSet ToProto(IORSet orset) } case ORSet refs: { + b.TypeInfo = GetTypeDescriptor(typeof(IActorRef)); b.Vvector = SerializationSupport.VersionVectorToProto(refs.VersionVector); - b.TypeInfo.Type = ValType.ActorRef; var actorRefElements = new List(refs.ElementsMap.Keys); actorRefElements.Sort(); foreach (var val in actorRefElements) @@ -327,43 +353,56 @@ private Proto.Msg.ORSet ToProto(IORSet orset) return b; } default: // unknown type - { + { + return SerDeserGenericCache.ORSetUnknownToProto(orset.SetType, + this, orset, b); // runtime type - enter horrible dynamic serialization stuff - var makeProto = ORSetUnknownMaker.MakeGenericMethod(orset.SetType); - return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset, b }); + //var makeProto = ORSetUnknownMaker.MakeGenericMethod(orset.SetType); + //return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset, b }); } } } - private IORSet FromProto(Proto.Msg.ORSet orset) + private IORSet ORSetFromProto(Proto.Msg.ORSet orset) { - var dots = orset.Dots.Select(x => _ser.VersionVectorFromProto(x)); + //var dots = orset.Dots.Select(x => _ser.VersionVectorFromProto(x)); var vector = _ser.VersionVectorFromProto(orset.Vvector); if (orset.IntElements.Count > 0 || orset.TypeInfo.Type == ValType.Int) { - var eInt = orset.IntElements.Zip(dots, (i, versionVector) => (i, versionVector)) - .ToImmutableDictionary(x => x.i, y => y.versionVector); + var eInt = ImmutableDictionary.CreateRange( + ZipSetStatic(orset.IntElements, orset.Dots)); + //var eInt = orset.IntElements.Zip(dots, (i, versionVector) => (i, versionVector)) + // .ToImmutableDictionary(x => x.i, y => y.versionVector); return new ORSet(eInt, vector); } if (orset.LongElements.Count > 0 || orset.TypeInfo.Type == ValType.Long) { - var eLong = orset.LongElements.Zip(dots, (i, versionVector) => (i, versionVector)) - .ToImmutableDictionary(x => x.i, y => y.versionVector); + var eLong = + ImmutableDictionary.CreateRange( + ZipSetStatic(orset.LongElements, orset.Dots)); + //var eLong = orset.LongElements.Zip(dots, (i, versionVector) => (i, versionVector)) + // .ToImmutableDictionary(x => x.i, y => y.versionVector); return new ORSet(eLong, vector); } if (orset.StringElements.Count > 0 || orset.TypeInfo.Type == ValType.String) { - var eStr = orset.StringElements.Zip(dots, (i, versionVector) => (i, versionVector)) - .ToImmutableDictionary(x => x.i, y => y.versionVector); + var eStr = + ImmutableDictionary.CreateRange( + ZipSetStatic(orset.StringElements, orset.Dots)); + //var eStr = orset.StringElements.Zip(dots, (i, versionVector) => (i, versionVector)) + // .ToImmutableDictionary(x => x.i, y => y.versionVector); return new ORSet(eStr, vector); } if (orset.ActorRefElements.Count > 0 || orset.TypeInfo.Type == ValType.ActorRef) { + //TODO: This is the odd duck of odd ducks to optimize. + var dots = + orset.Dots.Select(x => _ser.VersionVectorFromProto(x)); var eRef = orset.ActorRefElements.Zip(dots, (i, versionVector) => (i, versionVector)) .ToImmutableDictionary(x => _ser.ResolveActorRef(x.i), y => y.versionVector); return new ORSet(eRef, vector); @@ -371,76 +410,148 @@ private IORSet FromProto(Proto.Msg.ORSet orset) // runtime type - enter horrible dynamic serialization stuff - var setContentType = Type.GetType(orset.TypeInfo.TypeName); - - var eOther = orset.OtherElements.Zip(dots, - (i, versionVector) => (_ser.OtherMessageFromProto(i), versionVector)) - .ToImmutableDictionary(x => x.Item1, x => x.versionVector); - - var setType = ORSetMaker.MakeGenericMethod(setContentType); - return (IORSet)setType.Invoke(this, new object[] { eOther, vector }); + var setContentType = _otherTypeCache.GetOrAdd(orset.TypeInfo.TypeName, static tn=> Type.GetType(tn)); + return SerDeserGenericCache.ToGenericORSet(setContentType, this, + orset, vector); + //var eOther = orset.OtherElements.Zip(dots, + // (i, versionVector) => (_ser.OtherMessageFromProto(i), versionVector)) + // .ToImmutableDictionary(x => x.Item1, x => x.versionVector); + //var setType = ORSetMaker.MakeGenericMethod(setContentType); + //return (IORSet)setType.Invoke(this, new object[] { eOther, vector }); } private static readonly MethodInfo ORSetMaker = - typeof(ReplicatedDataSerializer).GetMethod(nameof(ToGenericORSet), BindingFlags.Static | BindingFlags.NonPublic); - - private static ORSet ToGenericORSet(ImmutableDictionary elems, VersionVector vector) + typeof(ReplicatedDataSerializer).GetMethod(nameof(ToGenericORSet), BindingFlags.Instance| BindingFlags.NonPublic); + + private IEnumerable> ZipSetStatic( + RepeatedField fields, RepeatedField dots) { - var finalInput = elems.ToImmutableDictionary(x => (T)x.Key, v => v.Value); - - return new ORSet(finalInput, vector); + int i = 0; + while(i StatefulIterateTransform( + IEnumerable fields, TState state, Func func) + { + foreach (var f in fields) + { + yield return func(state, f); + } + } + private IEnumerable> ZipSet( + RepeatedField fields, RepeatedField dots) + { + //TODO: Generalize this and ORSet to save on code size. + int i = 0; + while(i ToGenericORSet(Proto.Msg.ORSet set, VersionVector vector) + { + return new ORSet( + ImmutableDictionary.Empty.AddRange( + ZipSet(set.OtherElements, set.Dots)), vector); } private static readonly MethodInfo ORSetUnknownMaker = typeof(ReplicatedDataSerializer).GetMethod(nameof(ORSetUnknownToProto), BindingFlags.Instance | BindingFlags.NonPublic); - /// - /// Called when we're serializing none of the standard object types with ORSet - /// - private Proto.Msg.ORSet ORSetUnknownToProto(IORSet o, Proto.Msg.ORSet b) + private class SortingListObjectPool + : IPooledObjectPolicy> { - var orset = (ORSet)o; - b.Vvector = SerializationSupport.VersionVectorToProto(orset.VersionVector); - b.TypeInfo.Type = ValType.Other; - b.TypeInfo.TypeName = typeof(T).TypeQualifiedName(); - - var otherElements = new List(); - var otherElementsDict = new Dictionary(); - foreach (var kvp in orset.ElementsMap) + public static readonly SortingListObjectPool Instance = new(); + public List<(OtherMessage, Proto.Msg.VersionVector)> Create() { - var otherElement = _ser.OtherMessageToProto(kvp.Key); - otherElements.Add(otherElement); - otherElementsDict[otherElement] = SerializationSupport.VersionVectorToProto(kvp.Value); + return new List<(OtherMessage, Proto.Msg.VersionVector)>(32); } - otherElements.Sort(OtherMessageComparer.Instance); - - foreach (var val in otherElements) + + public bool Return(List<(OtherMessage, Proto.Msg.VersionVector)> obj) { - b.OtherElements.Add(val); - b.Dots.Add(otherElementsDict[val]); + if (obj.Count <= 128) + { + obj.Clear(); + return true; + } + + return false; } - return b; } - private ORSet.IAddDeltaOperation ORAddDeltaOperationFromBinary(byte[] bytes) + private static readonly + ObjectPool>> + _orSetPool = + new DefaultObjectPool< + List<(OtherMessage, Proto.Msg.VersionVector)>>( + SortingListObjectPool.Instance); + + //LazyPool>> + //_orSetPool = + //new LazyPool>(32); + /// + /// Called when we're serializing none of the standard object types with ORSet + /// + private Proto.Msg.ORSet ORSetUnknownToProto(IORSet o, + Proto.Msg.ORSet b) + { + var orset = (ORSet)o; + b.Vvector = + SerializationSupport.VersionVectorToProto( + orset.VersionVector); + b.TypeInfo = GetTypeDescriptor(typeof(T)); + //b.TypeInfo.Type = ValType.Other; + //b.TypeInfo.TypeName = typeof(T).TypeQualifiedName(); + + var otherElements = _orSetPool.Get(); + + //var otherElementsDict = new Dictionary(); + foreach (var kvp in orset.ElementsMap) + { + //var otherElement = ; + otherElements.Add(new(_ser.OtherMessageToProto(kvp.Key), + SerializationSupport + .VersionVectorToProto(kvp.Value))); + //otherElementsDict[otherElement] = SerializationSupport.VersionVectorToProto(kvp.Value); + } + + otherElements.Sort(OtherMessageAndVersionComparer.Instance); + + foreach (var val in otherElements) + { + b.OtherElements.Add(val.Item1); + b.Dots.Add(val.Item2); + } + + _orSetPool.Return(otherElements); + return b; + } + + private ORSet.IAddDeltaOperation ORAddDeltaOperationFromBinary(byte[] bytes) { - var set = FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); + var set = ORSetFromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); return set.ToAddDeltaOperation(); } private ORSet.IRemoveDeltaOperation ORRemoveOperationFromBinary(byte[] bytes) { - var set = FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); + var set = ORSetFromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); return set.ToRemoveDeltaOperation(); } private ORSet.IFullStateDeltaOperation ORFullStateDeltaOperationFromBinary(byte[] bytes) { - var set = FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); + var set = ORSetFromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes)); return set.ToFullStateDeltaOperation(); } - private Proto.Msg.ORSetDeltaGroup ToProto(ORSet.IDeltaGroupOperation orset) + private Proto.Msg.ORSetDeltaGroup ORDeltaGroupOpToProto(ORSet.IDeltaGroupOperation orset) { var deltaGroup = new Proto.Msg.ORSetDeltaGroup(); @@ -460,15 +571,15 @@ void SetType(IORSet underlying) switch (op) { case ORSet.IAddDeltaOperation add: - deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Add, Underlying = ToProto(add.UnderlyingSerialization) }); + deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Add, Underlying = ORSetToProto(add.UnderlyingSerialization) }); SetType(add.UnderlyingSerialization); break; case ORSet.IRemoveDeltaOperation remove: - deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Remove, Underlying = ToProto(remove.UnderlyingSerialization) }); + deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Remove, Underlying = ORSetToProto(remove.UnderlyingSerialization) }); SetType(remove.UnderlyingSerialization); break; case ORSet.IFullStateDeltaOperation full: - deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Full, Underlying = ToProto(full.UnderlyingSerialization) }); + deltaGroup.Entries.Add(new ORSetDeltaGroup.Types.Entry() { Operation = ORSetDeltaOp.Full, Underlying = ORSetToProto(full.UnderlyingSerialization) }); SetType(full.UnderlyingSerialization); break; default: throw new ArgumentException($"{op} should not be nested"); @@ -488,13 +599,13 @@ private ORSet.IDeltaGroupOperation ORDeltaGroupOperationFromBinary(byte[] bytes) switch (op.Operation) { case ORSetDeltaOp.Add: - ops.Add(FromProto(op.Underlying).ToAddDeltaOperation()); + ops.Add(ORSetFromProto(op.Underlying).ToAddDeltaOperation()); break; case ORSetDeltaOp.Remove: - ops.Add(FromProto(op.Underlying).ToRemoveDeltaOperation()); + ops.Add(ORSetFromProto(op.Underlying).ToRemoveDeltaOperation()); break; case ORSetDeltaOp.Full: - ops.Add(FromProto(op.Underlying).ToFullStateDeltaOperation()); + ops.Add(ORSetFromProto(op.Underlying).ToFullStateDeltaOperation()); break; default: throw new SerializationException($"Unknown ORSet delta operation ${op.Operation}"); @@ -519,9 +630,11 @@ private ORSet.IDeltaGroupOperation ORDeltaGroupOperationFromBinary(byte[] bytes) // if we made it this far, we're working with an object type // enter reflection magic - var type = Type.GetType(deltaGroup.TypeInfo.TypeName); - var orDeltaGroupType = typeof(ORSet<>.DeltaGroup).MakeGenericType(type); - return (ORSet.IDeltaGroupOperation)Activator.CreateInstance(orDeltaGroupType, arr); + var type = _otherTypeCache.GetOrAdd(deltaGroup.TypeInfo.TypeName, + static t => Type.GetType(t)); + return SerDeserGenericCache.CreateDeltaGroup(type, arr); + //var orDeltaGroupType = typeof(ORSet<>.DeltaGroup).MakeGenericType(type); + //return (ORSet.IDeltaGroupOperation)Activator.CreateInstance(orDeltaGroupType, arr); } #endregion @@ -538,7 +651,13 @@ private Proto.Msg.GSet GSetToProto(GSet gset) private Proto.Msg.GSet GSetToProtoUnknown(IGSet g) { var gset = (GSet)g; - var otherElements = new List(gset.Select(x => _ser.OtherMessageToProto(x))); + //TODO: Pool? + var otherElements = new List(gset.Count); + foreach (var otherElement in gset) + { + otherElements.Add(_ser.OtherMessageToProto(otherElement)); + } + //var otherElements = new List(gset.Select(x => _ser.OtherMessageToProto(x))); otherElements.Sort(OtherMessageComparer.Instance); var p = new Proto.Msg.GSet @@ -589,9 +708,11 @@ private Proto.Msg.GSet ToProto(IGSet gset) return p; } default: // unknown type - { - var protoMaker = GSetUnknownToProtoMaker.MakeGenericMethod(gset.SetType); - return (Proto.Msg.GSet)protoMaker.Invoke(this, new object[] { gset }); + { + return SerDeserGenericCache.GSetToProto(gset.SetType, this, + gset); + //var protoMaker = GSetUnknownToProtoMaker.MakeGenericMethod(gset.SetType); + //return (Proto.Msg.GSet)protoMaker.Invoke(this, new object[] { gset }); } } } @@ -630,10 +751,12 @@ private IGSet GSetFromBinary(byte[] bytes) var setContentType = Type.GetType(gset.TypeInfo.TypeName); - var eOther = gset.OtherElements.Select(x => _ser.OtherMessageFromProto(x)); - - var setType = GSetMaker.MakeGenericMethod(setContentType); - return (IGSet)setType.Invoke(this, new object[] { eOther }); + return SerDeserGenericCache.ToGenericGSet( + setContentType, this, gset.OtherElements); + + //var eOther = gset.OtherElements.Select(x => _ser.OtherMessageFromProto(x)); + //var setType = GSetMaker.MakeGenericMethod(setContentType); + //return (IGSet)setType.Invoke(this, new object[] { eOther }); } default: throw new SerializationException($"Unknown ValType of [{gset.TypeInfo.Type}] detected while deserializing GSet"); @@ -643,9 +766,19 @@ private IGSet GSetFromBinary(byte[] bytes) private static readonly MethodInfo GSetMaker = typeof(ReplicatedDataSerializer).GetMethod(nameof(ToGenericGSet), BindingFlags.Static | BindingFlags.NonPublic); - private static GSet ToGenericGSet(IEnumerable items) + private IEnumerable DeserializeRepeatedFieldEnum( + RepeatedField msg) { - return new GSet(items.Cast().ToImmutableHashSet()); + for (int i = 0; i < msg.Count; i++) + { + yield return (T)_ser.OtherMessageFromProto(msg[i]); + } + } + private GSet ToGenericGSet(RepeatedField items) + { + return new GSet( + ImmutableHashSet.CreateRange( + DeserializeRepeatedFieldEnum(items))); } #endregion @@ -670,8 +803,16 @@ private GCounter GCounterFromBytes(byte[] bytes) private GCounter GCounterFromProto(Proto.Msg.GCounter gProto) { - var entries = gProto.Entries.ToImmutableDictionary(k => _ser.UniqueAddressFromProto(k.Node), - v => BitConverter.ToUInt64(v.Value.ToByteArray(), 0)); + var entries = + ImmutableDictionary.Empty.AddRange( + StatefulIterateTransform(gProto.Entries, _ser, + static (ss, kv) => + { + return new KeyValuePair( + ss.UniqueAddressFromProto(kv.Node), + BitConverter.ToUInt64(kv.Value.ToByteArray(), + 0)); + })); return new GCounter(entries); } @@ -713,6 +854,18 @@ private Proto.Msg.Flag ToProto(Flag flag) return pFlag; } + private static byte[] GetFlagBytes(Flag flag) + { + return flag.Enabled ? FlagYes : FlagNo; + } + + //FUTURE: We should be able to keep the raw proto as readonlymemory if we get to that point. + private static readonly byte[] FlagYes = + new Proto.Msg.Flag() { Enabled = true }.ToByteArray(); + + private static readonly byte[] FlagNo = + new Proto.Msg.Flag() { Enabled = false }.ToByteArray(); + private Flag FlagFromProto(Proto.Msg.Flag flag) { return flag.Enabled ? Flag.True : Flag.False; @@ -729,8 +882,10 @@ private Flag FlagFromBinary(byte[] bytes) private Proto.Msg.LWWRegister ToProto(ILWWRegister register) { - var protoMaker = LWWProtoMaker.MakeGenericMethod(register.RegisterType); - return (Proto.Msg.LWWRegister)protoMaker.Invoke(this, new object[] { register }); + return SerDeserGenericCache.GenericLwwRegisterToProto( + register.RegisterType, this, register); + //var protoMaker = LWWProtoMaker.MakeGenericMethod(register.RegisterType); + //return (Proto.Msg.LWWRegister)protoMaker.Invoke(this, new object[] { register }); } private static readonly MethodInfo LWWProtoMaker = @@ -786,10 +941,11 @@ private ILWWRegister LWWRegisterFromProto(Proto.Msg.LWWRegister proto) typeName = "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding"; // runtime type - enter horrible dynamic serialization stuff - var setContentType = Type.GetType(typeName); - - var setType = LWWRegisterMaker.MakeGenericMethod(setContentType); - return (ILWWRegister)setType.Invoke(this, new object[] { proto }); + var setContentType = _otherTypeCache.GetOrAdd(typeName,static (t)=>Type.GetType(t)); + return SerDeserGenericCache.GenericLwwRegisterFromProto( + setContentType, this, proto); + //var setType = LWWRegisterMaker.MakeGenericMethod(setContentType); + //return (ILWWRegister)setType.Invoke(this, new object[] { proto }); } default: throw new SerializationException($"Unknown ValType of [{proto.TypeInfo.Type}] detected while deserializing LWWRegister"); @@ -813,8 +969,10 @@ private LWWRegister GenericLWWRegisterFromProto(Proto.Msg.LWWRegister prot private Proto.Msg.ORMap ToProto(IORDictionary ormap) { - var protoMaker = ORDictProtoMaker.MakeGenericMethod(ormap.KeyType, ormap.ValueType); - return (Proto.Msg.ORMap)protoMaker.Invoke(this, new object[] { ormap }); + return SerDeserGenericCache.ORDictionaryToProto(ormap.KeyType, + ormap.ValueType, this, ormap); + //var protoMaker = ORDictProtoMaker.MakeGenericMethod(ormap.KeyType, ormap.ValueType); + //return (Proto.Msg.ORMap)protoMaker.Invoke(this, new object[] { ormap }); } private static readonly MethodInfo ORDictProtoMaker = @@ -825,37 +983,40 @@ private Proto.Msg.ORMap ORDictToProto(IORDictionary o) where TValu var ormap = (ORDictionary)o; var proto = new Proto.Msg.ORMap(); ToORMapEntries(ormap.Entries, proto); - proto.Keys = ToProto(ormap.KeySet); + proto.Keys = ORSetToProto(ormap.KeySet); proto.ValueTypeInfo = GetTypeDescriptor(typeof(TValue)); return proto; } private void ToORMapEntries(IImmutableDictionary ormapEntries, ORMap proto) where TValue : IReplicatedData { - var entries = new List(); + EnsureRepeatedFieldSize(proto.Entries,ormapEntries.Count); + //proto.Entries.Capacity = proto.Entries.Count + ormapEntries.Count; + //var entries = new List(ormapEntries.Count); foreach (var e in ormapEntries) { var entry = new ORMap.Types.Entry(); - switch (e.Key) + if (typeof(TKey) == typeof(int)) { - case int i: - entry.IntKey = i; - break; - case long l: - entry.LongKey = l; - break; - case string str: - entry.StringKey = str; - break; - default: - entry.OtherKey = _ser.OtherMessageToProto(e.Key); - break; + entry.IntKey = (int)(object)e.Key; + } + else if (typeof(TKey) == typeof(long)) + { + entry.LongKey = (long)(object)e.Key; + } + else if (typeof(TKey) == typeof(string)) + { + entry.StringKey = (string)(object)e.Key; + } + else + { + entry.OtherKey = _ser.OtherMessageToProto(e.Key); } entry.Value = _ser.OtherMessageToProto(e.Value); - entries.Add(entry); + proto.Entries.Add(entry); } - proto.Entries.Add(entries); + //proto.Entries.Add(entries); } private static readonly MethodInfo ORDictMaker = @@ -874,40 +1035,44 @@ private IORDictionary ORDictionaryFromProto(Proto.Msg.ORMap proto) { var keyType = GetTypeFromDescriptor(proto.Keys.TypeInfo); var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo); - var protoMaker = ORDictMaker.MakeGenericMethod(keyType, valueType); - return (IORDictionary)protoMaker.Invoke(this, new object[] { proto }); + return SerDeserGenericCache.ORDictionaryFromProto(keyType, + valueType, this, proto); + //var protoMaker = ORDictMaker.MakeGenericMethod(keyType, valueType); + //return (IORDictionary)protoMaker.Invoke(this, new object[] { proto }); } private IORDictionary GenericORDictionaryFromProto(Proto.Msg.ORMap proto) where TValue : IReplicatedData { - var keys = FromProto(proto.Keys); - switch (proto.Keys.TypeInfo.Type) + var keys = ORSetFromProto(proto.Keys); + if (typeof(TKey) == typeof(int)) { - case ValType.Int: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, - v => (TValue)_ser.OtherMessageFromProto(v.Value)); - return new ORDictionary((ORSet)keys, entries); - } - case ValType.Long: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey, - v => (TValue)_ser.OtherMessageFromProto(v.Value)); - return new ORDictionary((ORSet)keys, entries); - } - case ValType.String: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey, - v => (TValue)_ser.OtherMessageFromProto(v.Value)); - return new ORDictionary((ORSet)keys, entries); - } - default: + var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, + v => (TValue)_ser.OtherMessageFromProto(v.Value)); + return new ORDictionary((ORSet)keys, entries); + } + else if (typeof(TKey) == typeof(long)) + { + var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey, + v => (TValue)_ser.OtherMessageFromProto(v.Value)); + return new ORDictionary((ORSet)keys, entries); + } + else if (typeof(TKey) == typeof(string)) + { + var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey, + v => (TValue)_ser.OtherMessageFromProto(v.Value)); + return new ORDictionary((ORSet)keys, entries); + } + else + { + + var entries = ImmutableDictionary.Empty.AddRange( + StatefulIterateTransform(proto.Entries, _ser, static (ss, kv) => { - var entries = proto.Entries.ToImmutableDictionary(x => (TKey)_ser.OtherMessageFromProto(x.OtherKey), - v => (TValue)_ser.OtherMessageFromProto(v.Value)); - - return new ORDictionary((ORSet)keys, entries); - } + return new KeyValuePair( + (TKey)ss.OtherMessageFromProto(kv.OtherKey), + (TValue)ss.OtherMessageFromProto(kv.Value)); + })); + return new ORDictionary((ORSet)keys, entries); } } @@ -916,9 +1081,10 @@ private Proto.Msg.ORMapDeltaGroup ORDictionaryDeltasToProto( { var keyType = deltaGroupOps[0].KeyType; var valueType = deltaGroupOps[0].ValueType; - - var protoMaker = ORDeltaGroupProtoMaker.MakeGenericMethod(keyType, valueType); - return (Proto.Msg.ORMapDeltaGroup)protoMaker.Invoke(this, new object[] { deltaGroupOps }); + return SerDeserGenericCache.ORDictionaryDeltasToProto(keyType, + valueType, this, deltaGroupOps); + //var protoMaker = ORDeltaGroupProtoMaker.MakeGenericMethod(keyType, valueType); + //return (Proto.Msg.ORMapDeltaGroup)protoMaker.Invoke(this, new object[] { deltaGroupOps }); } private static readonly MethodInfo ORDeltaGroupProtoMaker = @@ -962,24 +1128,29 @@ ORMapDeltaGroup.Types.Entry CreateEntry(ORDictionary.IDeltaOperati { case ORDictionary.PutDeltaOperation putDelta: entry.Operation = ORMapDeltaOp.OrmapPut; - entry.Underlying = ToProto(putDelta.Underlying.AsInstanceOf() + entry.Underlying = ORSetToProto(putDelta.Underlying.AsInstanceOf() .UnderlyingSerialization); entry.EntryData.Add(CreateMapEntry(putDelta.Key, putDelta.Value)); break; case ORDictionary.UpdateDeltaOperation upDelta: entry.Operation = ORMapDeltaOp.OrmapUpdate; - entry.Underlying = ToProto(upDelta.Underlying.AsInstanceOf() + entry.Underlying = ORSetToProto(upDelta.Underlying.AsInstanceOf() .UnderlyingSerialization); - entry.EntryData.AddRange(upDelta.Values.Select(x => CreateMapEntry(x.Key, x.Value)).ToList()); + entry.EntryData.Capacity = entry.EntryData.Count + + upDelta.Values.Count; + foreach (var x in upDelta.Values) + { + entry.EntryData.Add(CreateMapEntry(x.Key,x.Value)); + } break; case ORDictionary.RemoveDeltaOperation removeDelta: entry.Operation = ORMapDeltaOp.OrmapRemove; - entry.Underlying = ToProto(removeDelta.Underlying.AsInstanceOf() + entry.Underlying = ORSetToProto(removeDelta.Underlying.AsInstanceOf() .UnderlyingSerialization); break; case ORDictionary.RemoveKeyDeltaOperation removeKeyDelta: entry.Operation = ORMapDeltaOp.OrmapRemoveKey; - entry.Underlying = ToProto(removeKeyDelta.Underlying.AsInstanceOf() + entry.Underlying = ORSetToProto(removeKeyDelta.Underlying.AsInstanceOf() .UnderlyingSerialization); entry.EntryData.Add(CreateMapEntry(removeKeyDelta.Key)); break; @@ -1035,8 +1206,8 @@ private ORDictionary.IDeltaGroupOp ORDictionaryDeltaGroupFromProto(Proto.Msg.ORM var keyType = GetTypeFromDescriptor(deltaGroup.KeyTypeInfo); var valueType = GetTypeFromDescriptor(deltaGroup.ValueTypeInfo); - var groupMaker = ORDeltaGroupMaker.MakeGenericMethod(keyType, valueType); - return (ORDictionary.IDeltaGroupOp)groupMaker.Invoke(this, new object[] { deltaGroup }); + return SerDeserGenericCache.ORDictionaryDeltaGroupFromProto(keyType, + valueType, this, deltaGroup); } private static readonly MethodInfo ORDeltaGroupMaker = @@ -1076,7 +1247,7 @@ private ORDictionary.IDeltaGroupOp GenericORDictionaryDeltaGroupFromProto(ILWWDictionary o) + private Proto.Msg.LWWMap LWWDictToProto(ILWWDictionary o, Proto.Msg.TypeDescriptor td) { var lwwmap = (LWWDictionary)o; var proto = new Proto.Msg.LWWMap(); ToLWWMapEntries(lwwmap.Underlying.Entries, proto); - proto.Keys = ToProto(lwwmap.Underlying.KeySet); - proto.ValueTypeInfo = GetTypeDescriptor(typeof(TValue)); + proto.Keys = ORSetToProto(lwwmap.Underlying.KeySet); + proto.ValueTypeInfo = td;// GetTypeDescriptor(typeof(TValue)); return proto; } + private static void EnsureRepeatedFieldSize( + RepeatedField item, int forAtLeast) + { + var current = item.Count; + if (current + forAtLeast < item.Capacity) + { + item.Capacity = (current + forAtLeast); + } + } private void ToLWWMapEntries(IImmutableDictionary> underlyingEntries, LWWMap proto) { - var entries = new List(); + EnsureRepeatedFieldSize(proto.Entries,underlyingEntries.Count); + //var entries = new List(underlyingEntries.Count); foreach (var e in underlyingEntries) { var thisEntry = new LWWMap.Types.Entry(); - switch (e.Key) + if (typeof(TKey) == typeof(int)) { - case int i: - thisEntry.IntKey = i; - break; - case long l: - thisEntry.LongKey = l; - break; - case string str: - thisEntry.StringKey = str; - break; - default: - thisEntry.OtherKey = _ser.OtherMessageToProto(e.Key); - break; + thisEntry.IntKey = (int)(object)e.Key; + } + else if (typeof(TKey) == typeof(long)) + { + thisEntry.LongKey = (long)(object)e.Key; + } + else if (typeof(TKey) == typeof(string)) + { + thisEntry.StringKey = (string)(object)e.Key; + } + else + { + thisEntry.OtherKey = _ser.OtherMessageToProto(e.Key); } thisEntry.Value = LWWToProto(e.Value); - entries.Add(thisEntry); + proto.Entries.Add(thisEntry); } - - proto.Entries.Add(entries); } private static readonly MethodInfo LWWDictMaker = @@ -1220,44 +1404,80 @@ private ILWWDictionary LWWDictFromProto(Proto.Msg.LWWMap proto) var keyType = GetTypeFromDescriptor(proto.Keys.TypeInfo); var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo); - var dictMaker = LWWDictMaker.MakeGenericMethod(keyType, valueType); - return (ILWWDictionary)dictMaker.Invoke(this, new object[] { proto }); + return SerDeserGenericCache.LWWDictionaryFromProto(keyType, + valueType, this, proto); + //var dictMaker = LWWDictMaker.MakeGenericMethod(keyType, valueType); + //return (ILWWDictionary)dictMaker.Invoke(this, new object[] { proto }); } - private ILWWDictionary GenericLWWDictFromProto(Proto.Msg.LWWMap proto) + private ILWWDictionary GenericLWWDictFromProto( + Proto.Msg.LWWMap proto) { - var keys = FromProto(proto.Keys); - switch (proto.Keys.TypeInfo.Type) + var keys = ORSetFromProto(proto.Keys); + if (typeof(TKey) == typeof(int)) { - case ValType.Int: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, - v => GenericLWWRegisterFromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new LWWDictionary(orDict); - } - case ValType.Long: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey, - v => GenericLWWRegisterFromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new LWWDictionary(orDict); - } - case ValType.String: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey, - v => GenericLWWRegisterFromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new LWWDictionary(orDict); - } - default: - { - var entries = proto.Entries.ToImmutableDictionary(x => (TKey)_ser.OtherMessageFromProto(x.OtherKey), - v => GenericLWWRegisterFromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new LWWDictionary(orDict); - } + var entries = ImmutableDictionary.CreateRange( + StatefulIterateTransform(proto.Entries, this, + static (s, x) => + { + return new KeyValuePair>( + x.IntKey, + s.GenericLWWRegisterFromProto(x.Value)); + })); + var orDict = + new ORDictionary>((ORSet)keys, + entries); + return new LWWDictionary(orDict); + } + else if (typeof(TKey) == typeof(long)) + { + var entries = ImmutableDictionary.CreateRange( + StatefulIterateTransform(proto.Entries, this, + static (s, x) => + { + return new KeyValuePair>( + x.LongKey, + s.GenericLWWRegisterFromProto(x.Value)); + })); + var orDict = + new ORDictionary>( + (ORSet)keys, entries); + return new LWWDictionary(orDict); + } + else if (typeof(TKey) == typeof(string)) + { + var entries = ImmutableDictionary.CreateRange( + StatefulIterateTransform(proto.Entries, this, + static (s, x) => + { + return new KeyValuePair>( + x.StringKey, + s.GenericLWWRegisterFromProto(x.Value)); + })); + var orDict = + new ORDictionary>( + (ORSet)keys, entries); + return new LWWDictionary(orDict); } + else + { + var entries = ImmutableDictionary> + .Empty.AddRange( + StatefulIterateTransform(proto.Entries, this, + static (t, a) => + new KeyValuePair>( + (TKey)t._ser.OtherMessageFromProto( + a.OtherKey), + t.GenericLWWRegisterFromProto( + a.Value)))); + //proto.Entries.ToImmutableDictionary(x => (TKey)_ser.OtherMessageFromProto(x.OtherKey), + // v => GenericLWWRegisterFromProto(v.Value)); + var orDict = + new ORDictionary>( + (ORSet)keys, entries); + return new LWWDictionary(orDict); + } + } private ILWWDictionary LWWDictionaryFromBinary(IMemoryOwner bytes) @@ -1276,8 +1496,10 @@ private object LWWDictionaryDeltaGroupFromBinary(byte[] bytes) var orDictOp = ORDictionaryDeltaGroupFromProto(proto); var orSetType = orDictOp.ValueType.GenericTypeArguments[0]; - var maker = LWWDictionaryDeltaMaker.MakeGenericMethod(orDictOp.KeyType, orSetType); - return (ILWWDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp }); + return SerDeserGenericCache.LWWDictionaryDeltaFromProto( + orDictOp.KeyType, orSetType, this, orDictOp); + //var maker = LWWDictionaryDeltaMaker.MakeGenericMethod(orDictOp.KeyType, orSetType); + //return (ILWWDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp }); } private static readonly MethodInfo LWWDictionaryDeltaMaker = @@ -1299,8 +1521,10 @@ private ILWWDictionaryDeltaOperation LWWDictionaryDeltaFromProto(O private Proto.Msg.PNCounterMap ToProto(IPNCounterDictionary pnCounterDictionary) { - var protoMaker = PNCounterDictProtoMaker.MakeGenericMethod(pnCounterDictionary.KeyType); - return (Proto.Msg.PNCounterMap)protoMaker.Invoke(this, new object[] { pnCounterDictionary }); + return SerDeserGenericCache.PNCounterDictionaryToProto( + pnCounterDictionary.KeyType, this, pnCounterDictionary); + //var protoMaker = PNCounterDictProtoMaker.MakeGenericMethod(pnCounterDictionary.KeyType); + //return (Proto.Msg.PNCounterMap)protoMaker.Invoke(this, new object[] { pnCounterDictionary }); } private static readonly MethodInfo PNCounterDictProtoMaker = @@ -1311,7 +1535,7 @@ private Proto.Msg.PNCounterMap GenericPNCounterDictionaryToProto(IPNCounte { var pnDict = (PNCounterDictionary)pnCounterDictionary; var proto = new Proto.Msg.PNCounterMap(); - proto.Keys = ToProto(pnDict.Underlying.KeySet); + proto.Keys = ORSetToProto(pnDict.Underlying.KeySet); ToPNCounterEntries(pnDict.Underlying.Entries, proto); return proto; } @@ -1322,20 +1546,21 @@ private void ToPNCounterEntries(IImmutableDictionary unde foreach (var e in underlyingEntries) { var thisEntry = new PNCounterMap.Types.Entry(); - switch (e.Key) + if (typeof(TKey) == typeof(int)) { - case int i: - thisEntry.IntKey = i; - break; - case long l: - thisEntry.LongKey = l; - break; - case string str: - thisEntry.StringKey = str; - break; - default: - thisEntry.OtherKey = _ser.OtherMessageToProto(e.Key); - break; + thisEntry.IntKey = (int)(object)(e.Key); + } + else if (typeof(TKey) == typeof(long)) + { + thisEntry.LongKey = (long)(object)(e.Key); + } + else if (typeof(TKey) == typeof(string)) + { + thisEntry.StringKey = (string)(object)(e.Key); + } + else + { + thisEntry.OtherKey = _ser.OtherMessageToProto(e.Key); } thisEntry.Value = ToProto(e.Value); @@ -1357,55 +1582,77 @@ private IPNCounterDictionary PNCounterDictionaryFromBinary(IMemoryOwner by private IPNCounterDictionary PNCounterDictionaryFromProto(Proto.Msg.PNCounterMap proto) { var keyType = GetTypeFromDescriptor(proto.Keys.TypeInfo); - var dictMaker = PNCounterDictMaker.MakeGenericMethod(keyType); - return (IPNCounterDictionary)dictMaker.Invoke(this, new object[] { proto }); + return SerDeserGenericCache.PNCounterDictionaryFromProto(keyType, this, + proto); + //var dictMaker = PNCounterDictMaker.MakeGenericMethod(keyType); + //return (IPNCounterDictionary)dictMaker.Invoke(this, new object[] { proto }); } private static readonly MethodInfo PNCounterDictMaker = typeof(ReplicatedDataSerializer).GetMethod(nameof(GenericPNCounterDictionaryFromProto), BindingFlags.Instance | BindingFlags.NonPublic); - private IPNCounterDictionary GenericPNCounterDictionaryFromProto(Proto.Msg.PNCounterMap proto) + private IPNCounterDictionary GenericPNCounterDictionaryFromProto( + Proto.Msg.PNCounterMap proto) { - var keys = FromProto(proto.Keys); - switch (proto.Keys.TypeInfo.Type) + var keys = ORSetFromProto(proto.Keys); + if (typeof(TKey) == typeof(int)) { - case ValType.Int: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, - v => PNCounterFromProto(v.Value)); - var orDict = new ORDictionary((ORSet)keys, entries); - return new PNCounterDictionary(orDict); - } - case ValType.Long: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey, - v => PNCounterFromProto(v.Value)); - var orDict = new ORDictionary((ORSet)keys, entries); - return new PNCounterDictionary(orDict); - } - case ValType.String: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey, - v => PNCounterFromProto(v.Value)); - var orDict = new ORDictionary((ORSet)keys, entries); - return new PNCounterDictionary(orDict); - } - default: - { - var entries = proto.Entries.ToImmutableDictionary(x => (TKey)_ser.OtherMessageFromProto(x.OtherKey), - v => PNCounterFromProto(v.Value)); - var orDict = new ORDictionary((ORSet)keys, entries); - return new PNCounterDictionary(orDict); - } + var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, + v => PNCounterFromProto(v.Value)); + var orDict = + new ORDictionary((ORSet)keys, entries); + return new PNCounterDictionary(orDict); + } + else if (typeof(TKey) == typeof(long)) + { + var entries = proto.Entries.ToImmutableDictionary( + x => x.LongKey, + v => PNCounterFromProto(v.Value)); + var orDict = + new ORDictionary((ORSet)keys, + entries); + return new PNCounterDictionary(orDict); + } + else if (typeof(TKey) == typeof(string)) + { + var entries = proto.Entries.ToImmutableDictionary( + x => x.StringKey, + v => PNCounterFromProto(v.Value)); + var orDict = + new ORDictionary((ORSet)keys, + entries); + return new PNCounterDictionary(orDict); + } + else + { + + //TODO: Use StatefulIterateTransform() here and above to optimize code paths + var entries = ImmutableDictionary.CreateRange( + StatefulIterateTransform(proto.Entries, this, + (t, xv) => + new KeyValuePair( + (TKey)t._ser.OtherMessageFromProto(xv.OtherKey), + t.PNCounterFromProto(xv.Value))) + ); + //var entries = proto.Entries.ToImmutableDictionary( + // x => (TKey)_ser.OtherMessageFromProto(x.OtherKey), + // v => PNCounterFromProto(v.Value)); + var orDict = + new ORDictionary((ORSet)keys, + entries); + return new PNCounterDictionary(orDict); } + } private IPNCounterDictionaryDeltaOperation PNCounterDeltaFromBinary(byte[] bytes) { var proto = Proto.Msg.ORMapDeltaGroup.Parser.ParseFrom(bytes); var orDictOp = ORDictionaryDeltaGroupFromProto(proto); - var maker = PNCounterDeltaMaker.MakeGenericMethod(orDictOp.KeyType); - return (IPNCounterDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp }); + return SerDeserGenericCache.PnCounterDictionaryDeltaFromProto( + orDictOp.KeyType, this, orDictOp); + //var maker = PNCounterDeltaMaker.MakeGenericMethod(orDictOp.KeyType); + //return (IPNCounterDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp }); } private static readonly MethodInfo PNCounterDeltaMaker = @@ -1424,8 +1671,11 @@ private IPNCounterDictionaryDeltaOperation PNCounterDeltaFromProto(ORDicti private Proto.Msg.ORMultiMap ToProto(IORMultiValueDictionary multi) { - var protoMaker = MultiMapProtoMaker.MakeGenericMethod(multi.KeyType, multi.ValueType); - return (Proto.Msg.ORMultiMap)protoMaker.Invoke(this, new object[] { multi }); + return SerDeserGenericCache + .IORMultiValueDictionaryToProtoORMultiMap(multi.KeyType, + multi.ValueType, this, multi); + //var protoMaker = MultiMapProtoMaker.MakeGenericMethod(multi.KeyType, multi.ValueType); + //return (Proto.Msg.ORMultiMap)protoMaker.Invoke(this, new object[] { multi }); } private static readonly MethodInfo MultiMapProtoMaker = @@ -1448,38 +1698,36 @@ private Proto.Msg.ORMultiMap MultiMapToProto(IORMultiValueDictiona proto.WithValueDeltas = true; } - proto.Keys = ToProto(ormm.Underlying.KeySet); + proto.Keys = ORSetToProto(ormm.Underlying.KeySet); ToORMultiMapEntries(ormm.Underlying.Entries, proto); return proto; } private void ToORMultiMapEntries(IImmutableDictionary> underlyingEntries, ORMultiMap proto) { - var entries = new List(); + EnsureRepeatedFieldSize(proto.Entries, underlyingEntries.Count); foreach (var e in underlyingEntries) { var thisEntry = new ORMultiMap.Types.Entry(); - switch (e.Key) + if (typeof(TKey) == typeof(int)) { - case int i: - thisEntry.IntKey = i; - break; - case long l: - thisEntry.LongKey = l; - break; - case string str: - thisEntry.StringKey = str; - break; - default: - thisEntry.OtherKey = _ser.OtherMessageToProto(e.Key); - break; + thisEntry.IntKey = (int)(object)e.Key; } - - thisEntry.Value = ToProto(e.Value); - entries.Add(thisEntry); + else if (typeof(TKey) == typeof(long)) + { + thisEntry.LongKey = (long)(object)e.Key; + } + else if (typeof(TKey) == typeof(string)) + { + thisEntry.StringKey = (string)(object)e.Key; + } + else + { + thisEntry.OtherKey = _ser.OtherMessageToProto(e.Key); + } + thisEntry.Value = ORSetToProto(e.Value); + proto.Entries.Add(thisEntry); } - - proto.Entries.Add(entries); } private IORMultiValueDictionary ORMultiDictionaryFromBinary(IMemoryOwner bytes) @@ -1496,8 +1744,11 @@ private IORMultiValueDictionary ORMultiDictionaryFromProto(ORMultiMap proto) var keyType = GetTypeFromDescriptor(proto.Keys.TypeInfo); var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo); - var dictMaker = MultiDictMaker.MakeGenericMethod(keyType, valueType); - return (IORMultiValueDictionary)dictMaker.Invoke(this, new object[] { proto }); + return SerDeserGenericCache + .ProtoORMultiMapToIORMultiValueDictionary(keyType, valueType, + this, proto); + //var dictMaker = MultiDictMaker.MakeGenericMethod(keyType, valueType); + //return (IORMultiValueDictionary)dictMaker.Invoke(this, new object[] { proto }); } private static readonly MethodInfo MultiDictMaker = @@ -1505,37 +1756,40 @@ private IORMultiValueDictionary ORMultiDictionaryFromProto(ORMultiMap proto) private IORMultiValueDictionary GenericORMultiDictionaryFromProto(ORMultiMap proto) { - var keys = FromProto(proto.Keys); - switch (proto.Keys.TypeInfo.Type) + var keys = ORSetFromProto(proto.Keys); + if (typeof(TKey) == typeof(int)) { - case ValType.Int: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, - v => (ORSet)FromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); - } - case ValType.Long: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey, - v => (ORSet)FromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); - } - case ValType.String: - { - var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey, - v => (ORSet)FromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); - } - default: - { - var entries = proto.Entries.ToImmutableDictionary(x => (TKey)_ser.OtherMessageFromProto(x.OtherKey), - v => (ORSet)FromProto(v.Value)); - var orDict = new ORDictionary>((ORSet)keys, entries); - return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); - } + var entries = proto.Entries.ToImmutableDictionary(x => x.IntKey, + v => (ORSet)ORSetFromProto(v.Value)); + var orDict = new ORDictionary>((ORSet)keys, entries); + return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); + } + else if (typeof(TKey) == typeof(long)) + { + var entries = proto.Entries.ToImmutableDictionary(x => x.LongKey, + v => (ORSet)ORSetFromProto(v.Value)); + var orDict = new ORDictionary>((ORSet)keys, entries); + return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); + } + else if (typeof(TKey) == typeof(string)) + { + var entries = proto.Entries.ToImmutableDictionary(x => x.StringKey, + v => (ORSet)ORSetFromProto(v.Value)); + var orDict = new ORDictionary>((ORSet)keys, entries); + return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); + } + else + { + var entries = ImmutableDictionary.CreateRange( + StatefulIterateTransform(proto.Entries, this, + (ss, x) => + new KeyValuePair>( + (TKey)ss._ser.OtherMessageFromProto(x.OtherKey), + (ORSet)ss.ORSetFromProto(x.Value)))); + var orDict = + new ORDictionary>((ORSet)keys, + entries); + return new ORMultiValueDictionary(orDict, proto.WithValueDeltas); } } @@ -1545,8 +1799,11 @@ private object ORMultiDictionaryDeltaFromBinary(byte[] bytes) var orDictOp = ORDictionaryDeltaGroupFromProto(proto.Delta); var orSetType = orDictOp.ValueType.GenericTypeArguments[0]; - var maker = ORMultiDictionaryDeltaMaker.MakeGenericMethod(orDictOp.KeyType, orSetType); - return (IORMultiValueDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp, proto.WithValueDeltas }); + return SerDeserGenericCache.ToOrMultiDictionaryDelta( + orDictOp.KeyType, orSetType, this, orDictOp, + proto.WithValueDeltas); + //var maker = ORMultiDictionaryDeltaMaker.MakeGenericMethod(orDictOp.KeyType, orSetType); + //return (IORMultiValueDictionaryDeltaOperation)maker.Invoke(this, new object[] { orDictOp, proto.WithValueDeltas }); } private static readonly MethodInfo ORMultiDictionaryDeltaMaker = @@ -1642,24 +1899,28 @@ private IKey ORSetKeyFromBinary(byte[] bytes) { var proto = KeyFromBinary(bytes); var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); - var genType = typeof(ORSetKey<>).MakeGenericType(keyType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetORSetKey(keyType, proto.KeyId); + //var genType = typeof(ORSetKey<>).MakeGenericType(keyType); + //return (IKey)Activator.CreateInstance(genType, proto.KeyId); } private IKey GSetKeyFromBinary(byte[] bytes) { var proto = KeyFromBinary(bytes); var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); - var genType = typeof(GSetKey<>).MakeGenericType(keyType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetGSetKey(keyType, proto.KeyId); + //var genType = typeof(GSetKey<>).MakeGenericType(keyType); + //return (IKey)Activator.CreateInstance(genType, proto.KeyId); } private IKey LWWRegisterKeyFromBinary(byte[] bytes) { var proto = KeyFromBinary(bytes); var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); - var genType = typeof(LWWRegisterKey<>).MakeGenericType(keyType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetLWWRegisterKeyValue(keyType, + proto.KeyId); + //var genType = typeof(LWWRegisterKey<>).MakeGenericType(keyType); + //return (IKey)Activator.CreateInstance(genType, proto.KeyId); } private IKey GCounterKeyFromBinary(byte[] bytes) @@ -1686,8 +1947,8 @@ private IKey ORDictionaryKeyFromBinary(byte[] bytes) var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo); - var genType = typeof(ORDictionaryKey<,>).MakeGenericType(keyType, valueType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetORDictionaryKey(keyType, valueType, + proto.KeyId); } private IKey LWWDictionaryKeyFromBinary(byte[] bytes) @@ -1695,18 +1956,19 @@ private IKey LWWDictionaryKeyFromBinary(byte[] bytes) var proto = KeyFromBinary(bytes); var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo); - - var genType = typeof(LWWDictionaryKey<,>).MakeGenericType(keyType, valueType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetLWWDictionaryKey(keyType, valueType,proto.KeyId); + //var genType = typeof(LWWDictionaryKey<,>).MakeGenericType(keyType, valueType); + //return (IKey)Activator.CreateInstance(genType, proto.KeyId); } private IKey PNCounterDictionaryKeyFromBinary(byte[] bytes) { var proto = KeyFromBinary(bytes); var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); - - var genType = typeof(PNCounterDictionaryKey<>).MakeGenericType(keyType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetPNCounterDictionaryKeyValue(keyType, + proto.KeyId); + //var genType = typeof(PNCounterDictionaryKey<>).MakeGenericType(keyType); + //return (IKey)Activator.CreateInstance(genType, proto.KeyId); } private IKey ORMultiValueDictionaryKeyFromBinary(byte[] bytes) @@ -1714,9 +1976,10 @@ private IKey ORMultiValueDictionaryKeyFromBinary(byte[] bytes) var proto = KeyFromBinary(bytes); var keyType = GetTypeFromDescriptor(proto.KeyTypeInfo); var valueType = GetTypeFromDescriptor(proto.ValueTypeInfo); - - var genType = typeof(ORMultiValueDictionaryKey<,>).MakeGenericType(keyType, valueType); - return (IKey)Activator.CreateInstance(genType, proto.KeyId); + return SerDeserGenericCache.GetORMultiValueDictionaryKey(keyType, + valueType, proto.KeyId); + //var genType = typeof(ORMultiValueDictionaryKey<,>).MakeGenericType(keyType, valueType); + //return (IKey)Activator.CreateInstance(genType, proto.KeyId); } #endregion diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs index c5866f5925b..1ec7ee7e232 100644 --- a/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs +++ b/src/contrib/cluster/Akka.DistributedData/Serialization/SerializationSupport.cs @@ -136,6 +136,7 @@ public static IMemoryOwner DecompressWithRentedPool(ReadOnlyMemory i break; } } + } failed = false; @@ -217,8 +218,26 @@ public IActorRef ResolveActorRef(string path) private static readonly NonBlocking.ConcurrentDictionary _manifestBsCache = new(); + public Proto.Msg.OtherMessage OtherMessageToProto(object msg) { + Proto.Msg.OtherMessage BuildOther() + { + var m = new OtherMessage(); + var msgSerializer = Serialization.FindSerializerFor(msg); + m.SerializerId = msgSerializer.Identifier; + + m.EnclosedMessage = + UnsafeByteOperations.UnsafeWrap( + msgSerializer.ToBinary(msg)); //ByteString.CopyFrom(msgSerializer.ToBinary(msg)); + + var ms = Akka.Serialization.Serialization.ManifestFor(msgSerializer, msg); + if (!string.IsNullOrEmpty(ms)) + m.MessageManifest = _manifestBsCache.GetOrAdd(ms, + static k => ByteString.CopyFromUtf8(k)); + return m; + } + // Serialize actor references with full address information (defaultAddress). // When sending remote messages currentTransportInformation is already set, // but when serializing for digests or DurableStore it must be set here. diff --git a/src/core/Akka/Actor/ActorRef.cs b/src/core/Akka/Actor/ActorRef.cs index 203b72afa33..d17ab791b1a 100644 --- a/src/core/Akka/Actor/ActorRef.cs +++ b/src/core/Akka/Actor/ActorRef.cs @@ -13,6 +13,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Sources; using Akka.Actor.Internal; using Akka.Actor.Scheduler; using Akka.Annotations; diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index 8a7123760a6..e45b9e0e088 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -149,6 +149,7 @@ public static Task Ask(this ICanTell self, Func message } var future = provider.CreateFutureRef(result); + var path = future.Path; //The future actor needs to be unregistered in the temp container