diff --git a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs
index 53fceed79b9..73a85714bde 100644
--- a/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs
+++ b/src/benchmark/Akka.Benchmarks/DData/ORSetBenchmarks.cs
@@ -13,7 +13,9 @@
using Akka.Benchmarks.Configurations;
using Akka.Cluster;
using Akka.DistributedData;
+using Akka.DistributedData.Serialization;
using BenchmarkDotNet.Attributes;
+using static Akka.Benchmarks.DData.RDDBenchTypes;
namespace Akka.Benchmarks.DData
{
diff --git a/src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs b/src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs
new file mode 100644
index 00000000000..795c227e03f
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/DData/RDDBenchTypes.cs
@@ -0,0 +1,16 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2023 Lightbend Inc.
+// // Copyright (C) 2013-2023 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+namespace Akka.Benchmarks.DData;
+
+public class RDDBenchTypes
+{
+
+ public record struct TestKey(int i);
+
+ public record TestVal(string v);
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs
new file mode 100644
index 00000000000..002cd3b967b
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/DData/SerializerLwwDictionaryBenchmarks.cs
@@ -0,0 +1,140 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2023 Lightbend Inc.
+// // Copyright (C) 2013-2023 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Cluster;
+using Akka.Configuration;
+using Akka.DistributedData;
+using Akka.DistributedData.Serialization;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.DData;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class SerializerLwwDictionaryBenchmarks
+{
+ [Params(typeof(RDDBenchTypes.TestKey), typeof(RDDBenchTypes.TestVal))]
+ public Type KeyType;
+
+ [Params(typeof(RDDBenchTypes.TestKey), typeof(RDDBenchTypes.TestVal))]
+ public Type ValueType;
+
+ [Params(25)]
+ public int NumElements;
+
+ [Params(10)]
+ public int NumNodes;
+
+ private UniqueAddress[] _nodes;
+ private object _c1;
+ private ActorSystem sys;
+ private ReplicatedDataSerializer ser;
+ private byte[] _c1Ser;
+ private string _c1Manifest;
+
+ [GlobalSetup]
+ public void SetupSystem()
+ {
+ typeof(SerializerLwwDictionaryBenchmarks).GetMethod(
+ nameof(SerializerLwwDictionaryBenchmarks.CreateItems),
+ BindingFlags.Instance | BindingFlags.NonPublic)
+ .MakeGenericMethod(new []{KeyType,ValueType})
+ .Invoke(this, new object[]{});
+ var conf = ConfigurationFactory.ParseString(@"akka.actor {
+ serializers {
+ akka-replicated-data = ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData""
+ }
+ serialization-bindings {
+ ""Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData"" = akka-replicated-data
+ }
+ serialization-identifiers {
+ ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" = 11
+ }
+}");
+ sys = ActorSystem.Create("rddsb", conf);
+ ser = (ReplicatedDataSerializer)sys.Serialization.FindSerializerForType(
+ typeof(IReplicatedDataSerialization));
+ _c1Ser = ser.ToBinary(_c1);
+ _c1Manifest = ser.Manifest(_c1);
+ }
+
+ private void CreateItems()
+ {
+ var newNodes = new List(NumNodes);
+ foreach (var i in Enumerable.Range(0, NumNodes))
+ {
+ var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i);
+ var uniqueAddress = new UniqueAddress(address, i);
+ newNodes.Add(uniqueAddress);
+ }
+
+ _nodes = newNodes.ToArray();
+ var newElements = new List(NumNodes);
+ foreach (var i in Enumerable.Range(0, NumElements))
+ {
+
+ newElements.Add(generate(i));
+ }
+
+ var _c1 = LWWDictionary>
+ .Empty;
+ int j = 0;
+ foreach (var node in _nodes)
+ {
+ _c1 = _c1.SetItem(node, generate(j),
+ newElements);
+ j++;
+ }
+
+ this._c1 = _c1;
+ }
+
+ private TValue generate(int i)
+ {
+ if (typeof(TValue) == typeof(RDDBenchTypes.TestVal))
+ {
+ return (TValue)(object)new RDDBenchTypes.TestVal(i.ToString());
+ }
+ else if (typeof(TValue) == typeof(RDDBenchTypes.TestKey))
+ {
+ return (TValue)(object)new RDDBenchTypes.TestKey(i);
+ }
+ else if (typeof(TValue) == typeof(int))
+ {
+ return (TValue)(object)i;
+ }
+ else if (typeof(TValue) == typeof(string))
+ {
+ return (TValue)(object)i.ToString();
+ }
+ else if (typeof(TValue) == typeof(long))
+ {
+ return (TValue)(object)(i);
+ }
+ else
+ {
+ return (TValue)(object)(i);
+ }
+ }
+
+ [Benchmark]
+ public void Serialize_LWWDict()
+ {
+ ser.ToBinary(_c1);
+ }
+
+ [Benchmark]
+ public void Deserialize_LWWDict()
+ {
+ ser.FromBinary(_c1Ser, _c1Manifest);
+ }
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs
new file mode 100644
index 00000000000..bf0e1008226
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/DData/SerializerORDictionaryBenchmarks.cs
@@ -0,0 +1,90 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2023 Lightbend Inc.
+// // Copyright (C) 2013-2023 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System.Collections.Generic;
+using System.Linq;
+using Akka.Actor;
+using Akka.Benchmarks.Configurations;
+using Akka.Cluster;
+using Akka.Configuration;
+using Akka.DistributedData;
+using Akka.DistributedData.Serialization;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.DData;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class SerializerORDictionaryBenchmarks
+{
+ [Params(25)]
+ public int NumElements;
+
+ [Params(10)]
+ public int NumNodes;
+
+ private UniqueAddress[] _nodes;
+ private ORDictionary> _c1;
+ private ORSet _elements;
+ private ActorSystem sys;
+ private ReplicatedDataSerializer ser;
+ private byte[] _c1Ser;
+ private string _c1Manifest;
+
+ [GlobalSetup]
+ public void SetupSystem()
+ {
+ var newNodes = new List(NumNodes);
+ foreach(var i in Enumerable.Range(0, NumNodes)){
+ var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i);
+ var uniqueAddress = new UniqueAddress(address, i);
+ newNodes.Add(uniqueAddress);
+ }
+ _nodes = newNodes.ToArray();
+ var newElements = ORSet.Empty;
+ foreach(var i in Enumerable.Range(0, NumElements)){
+ newElements = newElements.Add(_nodes[0],new RDDBenchTypes.TestVal(i.ToString()));
+ }
+ _elements = newElements;
+
+ _c1 = ORDictionary>
+ .Empty;
+ int j = 0;
+ foreach(var node in _nodes)
+ {
+ _c1 = _c1.SetItem(node, new RDDBenchTypes.TestKey(j), _elements);
+ j++;
+ }
+ var conf = ConfigurationFactory.ParseString(@"akka.actor {
+ serializers {
+ akka-replicated-data = ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData""
+ }
+ serialization-bindings {
+ ""Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData"" = akka-replicated-data
+ }
+ serialization-identifiers {
+ ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" = 11
+ }
+}");
+ sys = ActorSystem.Create("rddsb", conf);
+ ser = (ReplicatedDataSerializer)sys.Serialization.FindSerializerForType(
+ typeof(IReplicatedDataSerialization));
+ _c1Ser = ser.ToBinary(_c1);
+ _c1Manifest = ser.Manifest(_c1);
+ }
+
+ [Benchmark]
+ public void Serialize_ORDictionary()
+ {
+ ser.ToBinary(_c1);
+ }
+
+ [Benchmark]
+ public void Deserialize_ORDictionary()
+ {
+ ser.FromBinary(_c1Ser, _c1Manifest);
+ }
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs b/src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs
new file mode 100644
index 00000000000..8c0d645d974
--- /dev/null
+++ b/src/benchmark/Akka.Benchmarks/DData/SerializerORSetBenchmarks.cs
@@ -0,0 +1,90 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2023 Lightbend Inc.
+// // Copyright (C) 2013-2023 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System.Collections.Generic;
+using System.Collections.Immutable;
+using System.Linq;
+using Akka.Actor;
+using Akka.Actor.Setup;
+using Akka.Benchmarks.Configurations;
+using Akka.Cluster;
+using Akka.Configuration;
+using Akka.DistributedData;
+using Akka.DistributedData.Serialization;
+using Akka.Serialization;
+using BenchmarkDotNet.Attributes;
+
+namespace Akka.Benchmarks.DData;
+
+[Config(typeof(MicroBenchmarkConfig))]
+public class SerializerORSetBenchmarks
+{
+ [Params(25)]
+ public int NumElements;
+
+ [Params(10)]
+ public int NumNodes;
+
+ private ActorSystem sys;
+ private ReplicatedDataSerializer ser;
+ private UniqueAddress[] _nodes;
+ private RDDBenchTypes.TestVal[] _elements;
+ private ORSet> _c1;
+ private byte[] _c1Ser;
+ private string _c1Manifest;
+
+ [GlobalSetup]
+ public void SetupSystem()
+ {
+ var newNodes = new List(NumNodes);
+ foreach(var i in Enumerable.Range(0, NumNodes)){
+ var address = new Address("akka.tcp", "Sys", "localhost", 2552 + i);
+ var uniqueAddress = new UniqueAddress(address, i);
+ newNodes.Add(uniqueAddress);
+ }
+ _nodes = newNodes.ToArray();
+ var newElements = new List(NumNodes);
+ foreach(var i in Enumerable.Range(0, NumElements)){
+ newElements.Add(new RDDBenchTypes.TestVal(i.ToString()));
+ }
+ _elements = newElements.ToArray();
+
+ _c1 = ORSet>.Empty;
+ foreach(var node in _nodes){
+ _c1 = _c1.Add(node, _elements.ToList());
+ }
+
+ var conf = ConfigurationFactory.ParseString(@"akka.actor {
+ serializers {
+ akka-replicated-data = ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData""
+ }
+ serialization-bindings {
+ ""Akka.DistributedData.IReplicatedDataSerialization, Akka.DistributedData"" = akka-replicated-data
+ }
+ serialization-identifiers {
+ ""Akka.DistributedData.Serialization.ReplicatedDataSerializer, Akka.DistributedData"" = 11
+ }
+}");
+ sys = ActorSystem.Create("rddsb", conf);
+ ser = (ReplicatedDataSerializer)sys.Serialization.FindSerializerForType(
+ typeof(IReplicatedDataSerialization));
+ _c1Ser = ser.ToBinary(_c1);
+ _c1Manifest = ser.Manifest(_c1);
+ }
+ [Benchmark]
+ public void Serialize_ORSet()
+ {
+ ser.ToBinary(_c1);
+ }
+
+ [Benchmark]
+ public void Deserialize_ORSet()
+ {
+ ser.FromBinary(_c1Ser, _c1Manifest);
+ }
+
+}
\ No newline at end of file
diff --git a/src/benchmark/Akka.Benchmarks/Program.cs b/src/benchmark/Akka.Benchmarks/Program.cs
index 83c31a46621..2619adfadca 100644
--- a/src/benchmark/Akka.Benchmarks/Program.cs
+++ b/src/benchmark/Akka.Benchmarks/Program.cs
@@ -5,7 +5,10 @@
//
//-----------------------------------------------------------------------
+using System;
using System.Reflection;
+using System.Threading;
+using Akka.Benchmarks.DData;
using BenchmarkDotNet.Running;
namespace Akka.Benchmarks
@@ -14,6 +17,15 @@ class Program
{
static void Main(string[] args)
{
+ //var w = new SerializerORDictionaryBenchmarks();
+ //w.NumElements = 25;
+ //w.NumNodes = 30;
+ //w.SetupSystem();
+ //Console.WriteLine("Running");
+ //while (true)
+ //{
+ // w.Serialize_ORDictionary();
+ //}
BenchmarkSwitcher.FromAssembly(Assembly.GetExecutingAssembly()).Run(args);
}
}
diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj
index c76abbeb524..fb71544fa38 100644
--- a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj
+++ b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj
@@ -14,4 +14,8 @@
+
+
+
+
diff --git a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj
index c4f04f30b21..02ab6edc9ae 100644
--- a/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj
+++ b/src/contrib/cluster/Akka.DistributedData/Akka.DistributedData.csproj
@@ -14,7 +14,9 @@
+
+
diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs
index 048f71c7229..74abe9ed150 100644
--- a/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs
+++ b/src/contrib/cluster/Akka.DistributedData/Serialization/OtherMessageComparer.cs
@@ -12,6 +12,15 @@
namespace Akka.DistributedData.Serialization
{
+ internal class OtherMessageAndVersionComparer : IComparer<
+ ValueTuple>
+ {
+ public static OtherMessageAndVersionComparer Instance { get; } = new();
+ public int Compare(ValueTuple x, ValueTuple y)
+ {
+ return OtherMessageComparer.Instance.Compare(x.Item1, y.Item1);
+ }
+ }
internal class OtherMessageComparer : IComparer
{
public static OtherMessageComparer Instance { get; } = new();
@@ -32,15 +41,36 @@ public int Compare(OtherMessage a, OtherMessage b)
if (aSize < bSize) return -1;
if (aSize > bSize) return 1;
- for (var i = 0; i < aSize; i++)
- {
- var aByte = aByteString[i];
- var bByte = bByteString[i];
- if (aByte < bByte) return -1;
- if (aByte > bByte) return 1;
- }
+ //int j = 0;
+ return aByteString.SequenceCompareTo(bByteString);
+ //while (j + 4 < aSize)
+ //{
+ // if (aByteString.Slice(j, 4)
+ // .SequenceEqual(bByteString.Slice(j, 4)) == false)
+ // {
+ // break;
+ // }
+ // else
+ // {
+ // j = j + 4;
+ // }
+ //}
+ //for (; j < aSize; j++)
+ //{
+ // var aByte = aByteString[j];
+ // var bByte = bByteString[j];
+ // if (aByte < bByte) return -1;
+ // if (aByte > bByte) return 1;
+ //}
+ //for (var i = 0; i < aSize; i++)
+ //{
+ // var aByte = aByteString[i];
+ // var bByte = bByteString[i];
+ // if (aByte < bByte) return -1;
+ // if (aByte > bByte) return 1;
+ //}
- return 0;
+ //return 0;
}
}
}
diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs
new file mode 100644
index 00000000000..a0411cb2502
--- /dev/null
+++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.GenericCache.cs
@@ -0,0 +1,822 @@
+// //-----------------------------------------------------------------------
+// //
+// // Copyright (C) 2009-2023 Lightbend Inc.
+// // Copyright (C) 2013-2023 .NET Foundation
+// //
+// //-----------------------------------------------------------------------
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Collections.Immutable;
+using System.Linq.Expressions;
+using System.Reflection;
+using System.Threading;
+using Akka.DistributedData.Serialization.Proto.Msg;
+using Google.Protobuf.Collections;
+
+namespace Akka.DistributedData.Serialization;
+
+public sealed partial class ReplicatedDataSerializer
+{
+ private sealed class LazyPool
+ {
+ private readonly ConcurrentQueue _poolItems = new();
+ private int _numItems = new();
+ private readonly int _maxItems;
+ public LazyPool(int maxItems)
+ {
+ _maxItems = maxItems;
+ }
+
+ public bool TryGetValue(out T value)
+ {
+ if (_numItems > 0)
+ {
+ if (Interlocked.Decrement(ref _numItems) < 0)
+ {
+ Interlocked.Increment(ref _numItems);
+ }
+ else
+ {
+ return _poolItems.TryDequeue(out value);
+ }
+ }
+ value = default;
+ return false;
+ }
+
+ public bool TryReturn(T value)
+ {
+ if (_numItems < _maxItems)
+ {
+ if (Interlocked.Increment(ref _numItems) < (_maxItems * 2))
+ {
+ _poolItems.Enqueue(value);
+ return true;
+ }
+ else
+ {
+ Interlocked.Decrement(ref _numItems);
+ }
+ }
+
+ return false;
+ }
+ }
+ private static class SerDeserGenericCache
+ {
+ private readonly struct TwoTypeLookup : IEquatable
+ {
+ public readonly Type Item1;
+ public readonly Type Item2;
+
+ public TwoTypeLookup(Type item1, Type item2)
+ {
+ Item1 = item1;
+ Item2 = item2;
+ }
+
+ public override bool Equals(object obj)
+ {
+ return obj is TwoTypeLookup other && Equals(other);
+ }
+
+ public bool Equals(TwoTypeLookup other)
+ {
+ return Item1 == other.Item1 && Item2 == other.Item2;
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return (Item1.GetHashCode() * 397) ^ Item2.GetHashCode();
+ }
+ }
+
+ public static bool operator ==(TwoTypeLookup left, TwoTypeLookup right)
+ {
+ return left.Equals(right);
+ }
+
+ public static bool operator !=(TwoTypeLookup left, TwoTypeLookup right)
+ {
+ return !left.Equals(right);
+ }
+ }
+ private static readonly ConcurrentDictionary>
+ toGenericOrSetCache = new();
+
+ public static IORSet ToGenericORSet(Type k,
+ ReplicatedDataSerializer serializer,
+ Proto.Msg.ORSet set,
+ VersionVector versionVectors)
+ {
+ if (toGenericOrSetCache.TryGetValue(k,
+ out var value) ==
+ false)
+ {
+ value = toGenericOrSetCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericTwoArgs<
+ Proto.Msg.ORSet, VersionVector, IORSet>(
+ ORSetMaker, kc,true)
+ .Compile());
+ }
+
+ return value(serializer, set, versionVectors);
+ }
+
+ private static readonly ConcurrentDictionary>
+ orSetUnknownToProto = new();
+
+ public static Proto.Msg.ORSet ORSetUnknownToProto(Type k,
+ ReplicatedDataSerializer serializer,
+ IORSet set, Proto.Msg.ORSet b)
+ {
+
+ var value = orSetUnknownToProto.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericTwoArgs<
+ IORSet, Proto.Msg.ORSet, Proto.Msg.ORSet>(
+ ORSetUnknownMaker, kc)
+ .Compile());
+ return value(serializer, set, b);
+ }
+
+ private static readonly ConcurrentDictionary,
+ ORSet.IDeltaGroupOperation>>
+ orDeltaGroupCtorCache = new();
+
+ public static ORSet.IDeltaGroupOperation CreateDeltaGroup(Type t,
+ ImmutableArray arr)
+ {
+ if (orDeltaGroupCtorCache.TryGetValue(t, out var value) == false)
+ {
+ value = orDeltaGroupCtorCache.GetOrAdd(t, static ct =>
+ {
+ return MakeConstructorExprForTypeWith1GenericsOneString<
+ ImmutableArray,
+ ORSet.IDeltaGroupOperation>(
+ typeof(ORSet<>.DeltaGroup), ct).Compile();
+ });
+ }
+
+ return value(arr);
+ }
+
+ private static readonly ConcurrentDictionary>
+ gSetToProtoCache = new();
+
+ public static Proto.Msg.GSet GSetToProto(Type k,
+ ReplicatedDataSerializer serializer,
+ IGSet operation)
+ {
+ var value = gSetToProtoCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericOneArg<
+ IGSet, Proto.Msg.GSet>(
+ GSetUnknownToProtoMaker, kc)
+ .Compile());
+
+ return value(serializer, operation);
+ }
+
+ private static readonly ConcurrentDictionary, IGSet>>
+ toGenericGSetCache = new();
+
+ public static IGSet ToGenericGSet(Type k,
+ ReplicatedDataSerializer serializer,
+ RepeatedField operation)
+ {
+ var value = toGenericGSetCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericOneArg, IGSet>(
+ GSetMaker, kc, true)
+ .Compile());
+
+ return value(serializer,operation);
+ }
+
+ private static readonly ConcurrentDictionary>
+ genericLwwRegisterToProtoCache = new();
+
+ public static Proto.Msg.LWWRegister GenericLwwRegisterToProto(Type k,
+ ReplicatedDataSerializer serializer,
+ ILWWRegister operation)
+ {
+ var value = genericLwwRegisterToProtoCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericOneArg<
+ ILWWRegister, Proto.Msg.LWWRegister>(
+ LWWProtoMaker, kc)
+ .Compile());
+
+ return value(serializer, operation);
+ }
+
+ private static readonly ConcurrentDictionary>
+ genericLwwRegisterFromProtoCache = new();
+
+ public static ILWWRegister GenericLwwRegisterFromProto(Type k,
+ ReplicatedDataSerializer serializer,
+ Proto.Msg.LWWRegister operation)
+ {
+ var value = genericLwwRegisterFromProtoCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericOneArg<
+ Proto.Msg.LWWRegister, ILWWRegister>(
+ LWWRegisterMaker, kc, true)
+ .Compile());
+
+
+ return value(serializer, operation);
+ }
+
+ private static readonly ConcurrentDictionary>
+ orDictionaryToProtoCache = new();
+
+ public static Proto.Msg.ORMap ORDictionaryToProto(Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ IORDictionary operation)
+ {
+
+ var value = orDictionaryToProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArg<
+ IORDictionary, Proto.Msg.ORMap>(
+ ORDictProtoMaker, kc.Item1, kc.Item2)
+ .Compile());
+
+
+ return value(serializer, operation);
+ }
+
+ private static readonly ConcurrentDictionary>
+ orDictionaryFromProtoCache = new();
+
+ public static IORDictionary ORDictionaryFromProto(Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ Proto.Msg.ORMap operation)
+ {
+ var value = orDictionaryFromProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArg<
+ Proto.Msg.ORMap, IORDictionary>(
+ ORDictMaker, kc.Item1, kc.Item2)
+ .Compile());
+
+
+ return value(serializer, operation);
+ }
+
+ private static readonly ConcurrentDictionary,
+ Proto.Msg.ORMapDeltaGroup>>
+ orDictionaryDeltasToProtoCache = new();
+
+ private static readonly ConcurrentDictionary>
+ orDictionaryDeltaGroupFromProtoCache = new();
+
+ public static Proto.Msg.ORMapDeltaGroup ORDictionaryDeltasToProto(
+ Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ List operation)
+ {
+ var value = orDictionaryDeltasToProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArg<
+ List,
+ Proto.Msg.ORMapDeltaGroup>(
+ ORDeltaGroupProtoMaker, kc.Item1, kc.Item2)
+ .Compile());
+
+ return value(serializer, operation);
+ }
+
+ public static ORDictionary.IDeltaGroupOp
+ ORDictionaryDeltaGroupFromProto(Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ Proto.Msg.ORMapDeltaGroup operation)
+ {
+ var value = orDictionaryDeltaGroupFromProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArg<
+ Proto.Msg.ORMapDeltaGroup,
+ ORDictionary.IDeltaGroupOp>(
+ ORDeltaGroupMaker, kc.Item1, kc.Item2)
+ .Compile());
+
+ return value(serializer, operation);
+ }
+
+ private static readonly ConcurrentDictionary>
+ lwwDictionaryToProtoCache = new();
+
+ private static readonly ConcurrentDictionary>
+ lwwDictionaryFromProtoCache = new();
+
+ private static readonly ConcurrentDictionary>
+ lwwDictionaryDeltaFromProtoCache = new();
+
+
+ public static Proto.Msg.LWWMap
+ LWWDictionaryToProto(Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ ILWWDictionary operation)
+ {
+ var value = lwwDictionaryToProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArgOneConst<
+ ILWWDictionary, Proto.Msg.TypeDescriptor, Proto.Msg.LWWMap>(
+ LWWDictProtoMaker, GetTypeDescriptor(kc.Item2), kc.Item1, kc.Item2)
+ .Compile());
+
+ return value(serializer, operation);
+ }
+
+ public static ILWWDictionary
+ LWWDictionaryFromProto(Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ Proto.Msg.LWWMap operation)
+ {
+ var value = lwwDictionaryFromProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArg<
+ Proto.Msg.LWWMap,
+ ILWWDictionary>(
+ LWWDictMaker, kc.Item1, kc.Item2)
+ .Compile());
+
+ return value(serializer, operation);
+ }
+
+ public static ILWWDictionaryDeltaOperation
+ LWWDictionaryDeltaFromProto(Type k, Type v,
+ ReplicatedDataSerializer serializer,
+ ORDictionary.IDeltaOperation operation)
+ {
+ var value = lwwDictionaryDeltaFromProtoCache.GetOrAdd(new(k, v),
+ static kc =>
+ MakeInstanceCallTwoGenericsOneArg<
+ ORDictionary.IDeltaOperation,
+ ILWWDictionaryDeltaOperation>(
+ LWWDictionaryDeltaMaker, kc.Item1, kc.Item2)
+ .Compile());
+
+
+ return value(serializer, operation);
+ }
+
+
+ private static readonly ConcurrentDictionary>
+ pnToProtoCache = new();
+
+ private static readonly ConcurrentDictionary>
+ pnDictionaryDeltaCache = new();
+
+ private static readonly ConcurrentDictionary>
+ pnDictionaryFromProtoCache = new();
+
+ public static IPNCounterDictionary
+ PNCounterDictionaryFromProto(Type k,
+ ReplicatedDataSerializer serializer,
+ PNCounterMap op)
+ {
+ var value = pnDictionaryFromProtoCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericOneArg<
+ PNCounterMap,
+ IPNCounterDictionary>(
+ PNCounterDictMaker, kc).Compile());
+
+
+ return value(serializer, op);
+ }
+
+ private static readonly ConcurrentDictionary>
+ pnCounterDictionaryFromProtoCache = new();
+
+ public static IPNCounterDictionaryDeltaOperation
+ PnCounterDictionaryDeltaFromProto(Type k,
+ ReplicatedDataSerializer serializer,
+ ORDictionary.IDeltaOperation op)
+ {
+ var value = pnDictionaryDeltaCache.GetOrAdd(k,
+ static kc =>
+ MakeInstanceCallOneGenericOneArg<
+ ORDictionary.IDeltaOperation,
+ IPNCounterDictionaryDeltaOperation>(
+ PNCounterDeltaMaker, kc).Compile());
+
+ return value(serializer, op);
+ }
+
+ public static PNCounterMap PNCounterDictionaryToProto(Type k,
+ ReplicatedDataSerializer serializer, IPNCounterDictionary dict)
+ {
+ var value = pnToProtoCache.GetOrAdd(k, static kc =>
+ MakeInstanceCallOneGenericOneArg(PNCounterDictProtoMaker, kc)
+ .Compile());
+
+ return value(serializer, dict);
+ }
+
+ private static readonly
+ ConcurrentDictionary>
+ orMultiValueDictionaryKeyCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ orMultiValueSerializerCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ orMultiValueDeserializerCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ orMultiValueDeltaOpCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ orDictionaryKeyCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ lwwDictionaryKeyCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ orSetKeyCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ pnCounterDictionaryKeyCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ lwwRegisterKeyCache = new();
+
+ private static readonly
+ ConcurrentDictionary>
+ gsetKeyCache = new();
+
+ public static Proto.Msg.ORMultiMap
+ IORMultiValueDictionaryToProtoORMultiMap(Type tk, Type tv,
+ ReplicatedDataSerializer ser,
+ IORMultiValueDictionary dict)
+ {
+
+ var value = orMultiValueSerializerCache.GetOrAdd(new(tk, tv),
+ MakeInstanceCallTwoGenericsOneArg<
+ IORMultiValueDictionary, Proto.Msg.ORMultiMap>(
+ MultiMapProtoMaker, tk,
+ tv)
+ .Compile());
+
+ return value(ser, dict);
+ }
+
+ public static IORMultiValueDictionary
+ ProtoORMultiMapToIORMultiValueDictionary(Type tk, Type tv,
+ ReplicatedDataSerializer ser,
+ Proto.Msg.ORMultiMap dict)
+ {
+ var value = orMultiValueDeserializerCache.GetOrAdd(new(tk, tv),
+ MakeInstanceCallTwoGenericsOneArg<
+ Proto.Msg.ORMultiMap, IORMultiValueDictionary>(
+ MultiDictMaker, tk,
+ tv)
+ .Compile());
+
+ return value(ser, dict);
+ }
+
+
+ public static IORMultiValueDictionaryDeltaOperation
+ ToOrMultiDictionaryDelta(Type tk, Type tv,
+ ReplicatedDataSerializer ser,
+ ORDictionary.IDeltaOperation deltaop, bool withValueDeltas)
+ {
+ var value = orMultiValueDeltaOpCache.GetOrAdd(new(tk, tv),
+ MakeInstanceCallTwoGenericsTwoArgs<
+ ORDictionary.IDeltaOperation, bool,
+ IORMultiValueDictionaryDeltaOperation>(
+ ORMultiDictionaryDeltaMaker, tk,
+ tv)
+ .Compile());
+
+ return value(ser, deltaop, withValueDeltas);
+ }
+
+ public static IKey GetGSetKey(Type k, string arg)
+ {
+ var value = gsetKeyCache.GetOrAdd(k,
+ MakeConstructorExprForTypeWith1GenericsOneString(
+ typeof(GSetKey<>), k).Compile());
+
+ return value(arg);
+ }
+
+ public static IKey GetLWWRegisterKeyValue(Type k, string arg)
+ {
+ var value = lwwRegisterKeyCache.GetOrAdd(k,
+ MakeConstructorExprForTypeWith1GenericsOneString(
+ typeof(LWWRegisterKey<>), k).Compile());
+ return value(arg);
+ }
+
+ public static IKey GetPNCounterDictionaryKeyValue(Type k, string arg)
+ {
+
+ var value = pnCounterDictionaryKeyCache.GetOrAdd(k,
+ MakeConstructorExprForTypeWith1GenericsOneString(
+ typeof(PNCounterDictionaryKey<>), k).Compile());
+
+ return value(arg);
+ }
+
+ public static IKey GetORSetKey(Type k, string arg)
+ {
+
+ var value = orSetKeyCache.GetOrAdd(k,
+ MakeConstructorExprForTypeWith1GenericsOneString(typeof(
+ ORSetKey<>), k).Compile());
+
+ return value(arg);
+ }
+
+ public static IKey GetORDictionaryKey(Type k, Type v,
+ string arg)
+ {
+ var value = orDictionaryKeyCache.GetOrAdd(new(k, v),
+ MakeConstructorExprForTypeWith2GenericsOneString(
+ typeof(ORDictionaryKey<,>), k, v)
+ .Compile());
+
+ return value(arg);
+ }
+
+ public static IKey GetORMultiValueDictionaryKey(Type k, Type v,
+ string arg)
+ {
+ var value = orMultiValueDictionaryKeyCache.GetOrAdd(new(k, v),
+ MakeConstructorExprForTypeWith2GenericsOneString(
+ typeof(ORMultiValueDictionaryKey<,>), k, v)
+ .Compile());
+
+ return value(arg);
+ }
+
+ public static IKey GetLWWDictionaryKey(Type k, Type v, string arg)
+ {
+ var value = lwwDictionaryKeyCache.GetOrAdd(new(k, v),
+ MakeConstructorExprForTypeWith2GenericsOneString(
+ typeof(LWWDictionaryKey<,>), k, v).Compile());
+
+ return value(arg);
+ }
+
+ private static
+ Expression>
+ MakeInstanceCallOneGenericOneArg(
+ MethodInfo nonConstructedGenericMethodInfo, Type tk,
+ bool convert = false)
+ {
+ var i = Expression.Parameter(typeof(ReplicatedDataSerializer),
+ "ser");
+ var p = Expression.Parameter(typeof(TArg),
+ "multi");
+ var c = Expression.Call(i,
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }),
+ new[] { p });
+ return Expression
+ .Lambda<
+ Func>(
+ convert ? Expression.Convert(c, typeof(TRet)) : c,
+ false, i, p);
+ }
+
+ private static
+ Expression>
+ MakeStaticCallOneGenericOneArg(
+ MethodInfo nonConstructedGenericMethodInfo, Type tk,
+ bool convert = false)
+ {
+ var p = Expression.Parameter(typeof(TArg),
+ "multi");
+ var c = Expression.Call(nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }),
+ new[] { p });
+ return Expression
+ .Lambda<
+ Func>(
+ convert ? Expression.Convert(c, typeof(TRet)) : c,
+ false, p);
+ }
+
+ private static
+ Expression>
+ MakeStaticCallOneGenericTwoArgs(
+ MethodInfo nonConstructedGenericMethodInfo, Type tk,
+ bool convert = false)
+ {
+ var p = Expression.Parameter(typeof(TArg1),
+ "multi");
+ var p2 = Expression.Parameter(typeof(TArg2));
+ var c = Expression.Call(
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }),
+ new[] { p,p2 });
+ return Expression
+ .Lambda<
+ Func>(
+ convert ? Expression.Convert(c, typeof(TRet)) : c, false, p,
+ p2);
+ }
+
+ private static Expression>
+ MakeInstanceCallOneGenericOneIgnoredArg(
+ MethodInfo nonConstructedGenericMethodInfo,
+ Type tk, Type targ2Ignore, bool convert = false)
+ {
+ var p2 = Expression.Constant(null, targ2Ignore);
+ var i = Expression.Parameter(typeof(ReplicatedDataSerializer),
+ "ser");
+ var p = Expression.Parameter(typeof(TArg),
+ "multi");
+ var c = Expression.Call(i,
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }),
+ p, p2);
+ return Expression
+ .Lambda<
+ Func>(
+ convert ? Expression.Convert(c, typeof(TRet)) : c, false, i,
+ p);
+
+ }
+ private static
+ Expression>
+ MakeInstanceCallOneGenericTwoArgs(
+ MethodInfo nonConstructedGenericMethodInfo, Type tk,
+ bool convert = false)
+ {
+
+ var p2 = Expression.Parameter(typeof(TArg2));
+ var i = Expression.Parameter(typeof(ReplicatedDataSerializer),
+ "ser");
+ var p = Expression.Parameter(typeof(TArg1),
+ "multi");
+ var c = Expression.Call(i,
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[] { tk }),
+ new[] { p, p2 });
+ return Expression
+ .Lambda<
+ Func>(
+ convert ? Expression.Convert(c, typeof(TRet)) : c, false, i,
+ p, p2);
+ }
+
+ private static
+ Expression>
+ MakeInstanceCallTwoGenericsOneArg(
+ MethodInfo nonConstructedGenericMethodInfo, Type tk, Type tv)
+ {
+ var i = Expression.Parameter(typeof(ReplicatedDataSerializer),
+ "ser");
+ var p = Expression.Parameter(typeof(TArg),
+ "multi");
+ var c = Expression.Call(i,
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[]
+ {
+ tk, tv
+ }),
+ new[] { p });
+ return Expression
+ .Lambda<
+ Func>(c, false, i, p);
+ }
+
+ private static
+ Expression>
+ MakeInstanceCallTwoGenericsOneArgOneConst(
+ MethodInfo nonConstructedGenericMethodInfo, TConst conArg, Type tk, Type tv)
+ {
+ var i = Expression.Parameter(typeof(ReplicatedDataSerializer),
+ "ser");
+ var p = Expression.Parameter(typeof(TArg),
+ "multi");
+ var pc = Expression.Constant(conArg, typeof(TConst));
+ var c = Expression.Call(i,
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[]
+ {
+ tk, tv
+ }),
+ new Expression[] { p ,pc});
+ return Expression
+ .Lambda<
+ Func>(c, false, i, p);
+ }
+
+ private static
+ Expression>
+ MakeInstanceCallTwoGenericsTwoArgs(
+ MethodInfo nonConstructedGenericMethodInfo, Type tk, Type tv)
+ {
+ var i = Expression.Parameter(typeof(ReplicatedDataSerializer),
+ "ser");
+ var p = Expression.Parameter(typeof(TArg1),
+ "multi");
+ var p2 = Expression.Parameter(typeof(TArg2));
+ var c = Expression.Call(i,
+ nonConstructedGenericMethodInfo.MakeGenericMethod(new[]
+ {
+ tk, tv
+ }),
+ new[] { p, p2 });
+ return Expression
+ .Lambda<
+ Func>(c, false, i, p, p2);
+ }
+
+ private static Expression>
+ MakeConstructorExprForTypeWith1GenericsOneString(Type t,
+ Type k)
+ {
+ var p = Expression.Parameter(typeof(T), "arg");
+ var ctor = t
+ .MakeGenericType(new[] { k })
+ .GetConstructor(new[] { typeof(T) });
+ var newExp = Expression.New(ctor, p);
+ var cast = Expression.Convert(newExp, typeof(TOut));
+ var newVal = Expression.Lambda>(cast, p);
+ return newVal;
+ }
+
+ private static Expression>
+ MakeConstructorExprForTypeWith2GenericsOneString(Type t, Type k,
+ Type v)
+ {
+ var p = Expression.Parameter(typeof(string), "key");
+ var ctor = t
+ .MakeGenericType(new[] { k, v })
+ .GetConstructor(new[] { typeof(string) });
+ var newExp = Expression.New(ctor, p);
+ var cast = Expression.Convert(newExp, typeof(IKey));
+ var newVal = Expression.Lambda>(cast, p);
+ return newVal;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs
index 7fcb32f8ff5..6fe8defeee5 100644
--- a/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs
+++ b/src/contrib/cluster/Akka.DistributedData/Serialization/ReplicatedDataSerializer.cs
@@ -10,6 +10,8 @@
using Akka.Serialization;
using Google.Protobuf;
using System;
+using System.Buffers;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
@@ -18,13 +20,16 @@
using Akka.DistributedData.Serialization.Proto.Msg;
using Akka.Util;
using Akka.Util.Internal;
+using Google.Protobuf.Collections;
+using Microsoft.Extensions.ObjectPool;
using ArgumentOutOfRangeException = System.ArgumentOutOfRangeException;
using IActorRef = Akka.Actor.IActorRef;
+using UniqueAddress = Akka.Cluster.UniqueAddress;
namespace Akka.DistributedData.Serialization
{
- public sealed class ReplicatedDataSerializer : SerializerWithStringManifest
+ public sealed partial class ReplicatedDataSerializer : SerializerWithStringManifest
{
private const string DeletedDataManifest = "A";
@@ -80,13 +85,13 @@ public override byte[] ToBinary(object obj)
{
switch (obj)
{
- case IORSet o: return SerializationSupport.Compress(ToProto(o));
- case ORSet.IAddDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray();
- case ORSet.IRemoveDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray();
+ case IORSet o: return SerializationSupport.Compress(ORSetToProto(o));
+ case ORSet.IAddDeltaOperation o: return ORSetToProto(o.UnderlyingSerialization).ToByteArray();
+ case ORSet.IRemoveDeltaOperation o: return ORSetToProto(o.UnderlyingSerialization).ToByteArray();
case IGSet g: return ToProto(g).ToByteArray();
case GCounter g: return ToProto(g).ToByteArray();
case PNCounter p: return ToProto(p).ToByteArray();
- case Flag f: return ToProto(f).ToByteArray();
+ case Flag f: return GetFlagBytes(f);//ToProto(f).ToByteArray();
case ILWWRegister l: return ToProto(l).ToByteArray();
case IORDictionary o: return SerializationSupport.Compress(ToProto(o));
case ORDictionary.IDeltaOperation p: return ToProto(p).ToByteArray();
@@ -101,8 +106,8 @@ public override byte[] ToBinary(object obj)
// key types
case IKey k: return ToProto(k).ToByteArray();
// less common delta types
- case ORSet.IDeltaGroupOperation o: return ToProto(o).ToByteArray();
- case ORSet.IFullStateDeltaOperation o: return ToProto(o.UnderlyingSerialization).ToByteArray();
+ case ORSet.IDeltaGroupOperation o: return ORDeltaGroupOpToProto(o).ToByteArray();
+ case ORSet.IFullStateDeltaOperation o: return ORSetToProto(o.UnderlyingSerialization).ToByteArray();
default:
throw new ArgumentException($"Can't serialize object of type [{obj.GetType().FullName}] in [{GetType().FullName}]");
}
@@ -112,7 +117,7 @@ public override object FromBinary(byte[] bytes, string manifest)
{
switch (manifest)
{
- case ORSetManifest: return ORSetFromBinary(SerializationSupport.Decompress(bytes));
+ case ORSetManifest: return ORSetFromBinary(SerializationSupport.DecompressWithRentedPool(bytes));
case ORSetAddManifest: return ORAddDeltaOperationFromBinary(bytes);
case ORSetRemoveManifest: return ORRemoveOperationFromBinary(bytes);
case GSetManifest: return GSetFromBinary(bytes);
@@ -120,18 +125,18 @@ public override object FromBinary(byte[] bytes, string manifest)
case PNCounterManifest: return PNCounterFromBytes(bytes);
case FlagManifest: return FlagFromBinary(bytes);
case LWWRegisterManifest: return LWWRegisterFromBinary(bytes);
- case ORMapManifest: return ORDictionaryFromBinary(SerializationSupport.Decompress(bytes));
+ case ORMapManifest: return ORDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes));
case ORMapPutManifest: return ORDictionaryPutFromBinary(bytes);
case ORMapRemoveManifest: return ORDictionaryRemoveFromBinary(bytes);
case ORMapRemoveKeyManifest: return ORDictionaryRemoveKeyFromBinary(bytes);
case ORMapUpdateManifest: return ORDictionaryUpdateFromBinary(bytes);
case ORMapDeltaGroupManifest: return ORDictionaryDeltaGroupFromBinary(bytes);
- case LWWMapManifest: return LWWDictionaryFromBinary(SerializationSupport.Decompress(bytes));
+ case LWWMapManifest: return LWWDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes));
case LWWMapDeltaGroupManifest:
return LWWDictionaryDeltaGroupFromBinary(bytes);
- case PNCounterMapManifest: return PNCounterDictionaryFromBinary(SerializationSupport.Decompress(bytes));
+ case PNCounterMapManifest: return PNCounterDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes));
case PNCounterMapDeltaOperationManifest: return PNCounterDeltaFromBinary(bytes);
- case ORMultiMapManifest: return ORMultiDictionaryFromBinary(SerializationSupport.Decompress(bytes));
+ case ORMultiMapManifest: return ORMultiDictionaryFromBinary(SerializationSupport.DecompressWithRentedPool(bytes));
case ORMultiMapDeltaOperationManifest: return ORMultiDictionaryDeltaFromBinary(bytes);
case DeletedDataManifest: return DeletedData.Instance;
case VersionVectorManifest: return _ser.VersionVectorFromBinary(bytes);
@@ -204,7 +209,20 @@ public override string Manifest(object o)
}
}
+ private static readonly ConcurrentDictionary
+ _typeDescriptorCache = new();
private static TypeDescriptor GetTypeDescriptor(Type t)
+ {
+ if (_typeDescriptorCache.TryGetValue(t, out var item) == false)
+ {
+ item = _typeDescriptorCache.GetOrAdd(t,
+ static k => _getTypeDescriptorImpl(k));
+ }
+
+ return item;
+ }
+
+ private static TypeDescriptor _getTypeDescriptorImpl(Type t)
{
var typeInfo = new TypeDescriptor();
if (t == typeof(string))
@@ -232,6 +250,8 @@ private static TypeDescriptor GetTypeDescriptor(Type t)
return typeInfo;
}
+ private static readonly ConcurrentDictionary
+ _otherTypeCache = new();
private static Type GetTypeFromDescriptor(TypeDescriptor t)
{
switch (t.Type)
@@ -245,34 +265,43 @@ private static Type GetTypeFromDescriptor(TypeDescriptor t)
case ValType.ActorRef:
return typeof(IActorRef);
case ValType.Other:
+ {
+ if (_otherTypeCache.TryGetValue(t.TypeName, out var type) ==
+ false)
{
- var type = Type.GetType(t.TypeName);
- return type;
+ type= _otherTypeCache.GetOrAdd(t.TypeName,
+ static (k) => Type.GetType(k));
}
+
+ return type;
+ //var type = Type.GetType(t.TypeName);
+ //return type;
+ }
default:
throw new SerializationException($"Unknown ValType of [{t.Type}] detected");
}
}
#region ORSet
- private IORSet ORSetFromBinary(byte[] bytes)
+ private IORSet ORSetFromBinary(IMemoryOwner bytes)
{
- return FromProto(Proto.Msg.ORSet.Parser.ParseFrom(bytes));
+ using (bytes)
+ {
+ var p = Proto.Msg.ORSet.Parser.ParseFrom(bytes.Memory.Span);
+ return ORSetFromProto(p);
+ }
}
- private Proto.Msg.ORSet ToProto(IORSet orset)
+ private Proto.Msg.ORSet ORSetToProto(IORSet orset)
{
- var b = new Proto.Msg.ORSet
- {
- TypeInfo = new TypeDescriptor()
- };
+ var b = new Proto.Msg.ORSet();
switch (orset)
{
case ORSet ints:
- {
+ {
+ b.TypeInfo = GetTypeDescriptor(typeof(int));
b.Vvector = SerializationSupport.VersionVectorToProto(ints.VersionVector);
- b.TypeInfo.Type = ValType.Int;
var intElements = new List(ints.ElementsMap.Keys);
intElements.Sort();
foreach (var val in intElements)
@@ -284,8 +313,9 @@ private Proto.Msg.ORSet ToProto(IORSet orset)
}
case ORSet longs:
{
+
+ b.TypeInfo = GetTypeDescriptor(typeof(long));
b.Vvector = SerializationSupport.VersionVectorToProto(longs.VersionVector);
- b.TypeInfo.Type = ValType.Long;
var longElements = new List(longs.ElementsMap.Keys);
longElements.Sort();
foreach (var val in longElements)
@@ -297,8 +327,9 @@ private Proto.Msg.ORSet ToProto(IORSet orset)
}
case ORSet strings:
{
+
+ b.TypeInfo = GetTypeDescriptor(typeof(string));
b.Vvector = SerializationSupport.VersionVectorToProto(strings.VersionVector);
- b.TypeInfo.Type = ValType.String;
var stringElements = new List(strings.ElementsMap.Keys);
stringElements.Sort();
foreach (var val in stringElements)
@@ -310,8 +341,8 @@ private Proto.Msg.ORSet ToProto(IORSet orset)
}
case ORSet refs:
{
+ b.TypeInfo = GetTypeDescriptor(typeof(IActorRef));
b.Vvector = SerializationSupport.VersionVectorToProto(refs.VersionVector);
- b.TypeInfo.Type = ValType.ActorRef;
var actorRefElements = new List(refs.ElementsMap.Keys);
actorRefElements.Sort();
foreach (var val in actorRefElements)
@@ -322,43 +353,56 @@ private Proto.Msg.ORSet ToProto(IORSet orset)
return b;
}
default: // unknown type
- {
+ {
+ return SerDeserGenericCache.ORSetUnknownToProto(orset.SetType,
+ this, orset, b);
// runtime type - enter horrible dynamic serialization stuff
- var makeProto = ORSetUnknownMaker.MakeGenericMethod(orset.SetType);
- return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset, b });
+ //var makeProto = ORSetUnknownMaker.MakeGenericMethod(orset.SetType);
+ //return (Proto.Msg.ORSet)makeProto.Invoke(this, new object[] { orset, b });
}
}
}
- private IORSet FromProto(Proto.Msg.ORSet orset)
+ private IORSet ORSetFromProto(Proto.Msg.ORSet orset)
{
- var dots = orset.Dots.Select(x => _ser.VersionVectorFromProto(x));
+ //var dots = orset.Dots.Select(x => _ser.VersionVectorFromProto(x));
var vector = _ser.VersionVectorFromProto(orset.Vvector);
if (orset.IntElements.Count > 0 || orset.TypeInfo.Type == ValType.Int)
{
- var eInt = orset.IntElements.Zip(dots, (i, versionVector) => (i, versionVector))
- .ToImmutableDictionary(x => x.i, y => y.versionVector);
+ var eInt = ImmutableDictionary.CreateRange(
+ ZipSetStatic(orset.IntElements, orset.Dots));
+ //var eInt = orset.IntElements.Zip(dots, (i, versionVector) => (i, versionVector))
+ // .ToImmutableDictionary(x => x.i, y => y.versionVector);
return new ORSet(eInt, vector);
}
if (orset.LongElements.Count > 0 || orset.TypeInfo.Type == ValType.Long)
{
- var eLong = orset.LongElements.Zip(dots, (i, versionVector) => (i, versionVector))
- .ToImmutableDictionary(x => x.i, y => y.versionVector);
+ var eLong =
+ ImmutableDictionary.CreateRange(
+ ZipSetStatic(orset.LongElements, orset.Dots));
+ //var eLong = orset.LongElements.Zip(dots, (i, versionVector) => (i, versionVector))
+ // .ToImmutableDictionary(x => x.i, y => y.versionVector);
return new ORSet(eLong, vector);
}
if (orset.StringElements.Count > 0 || orset.TypeInfo.Type == ValType.String)
{
- var eStr = orset.StringElements.Zip(dots, (i, versionVector) => (i, versionVector))
- .ToImmutableDictionary(x => x.i, y => y.versionVector);
+ var eStr =
+ ImmutableDictionary.CreateRange(
+ ZipSetStatic(orset.StringElements, orset.Dots));
+ //var eStr = orset.StringElements.Zip(dots, (i, versionVector) => (i, versionVector))
+ // .ToImmutableDictionary(x => x.i, y => y.versionVector);
return new ORSet(eStr, vector);
}
if (orset.ActorRefElements.Count > 0 || orset.TypeInfo.Type == ValType.ActorRef)
{
+ //TODO: This is the odd duck of odd ducks to optimize.
+ var dots =
+ orset.Dots.Select(x => _ser.VersionVectorFromProto(x));
var eRef = orset.ActorRefElements.Zip(dots, (i, versionVector) => (i, versionVector))
.ToImmutableDictionary(x => _ser.ResolveActorRef(x.i), y => y.versionVector);
return new ORSet(eRef, vector);
@@ -366,76 +410,148 @@ private IORSet FromProto(Proto.Msg.ORSet orset)
// runtime type - enter horrible dynamic serialization stuff
- var setContentType = Type.GetType(orset.TypeInfo.TypeName);
-
- var eOther = orset.OtherElements.Zip(dots,
- (i, versionVector) => (_ser.OtherMessageFromProto(i), versionVector))
- .ToImmutableDictionary(x => x.Item1, x => x.versionVector);
-
- var setType = ORSetMaker.MakeGenericMethod(setContentType);
- return (IORSet)setType.Invoke(this, new object[] { eOther, vector });
+ var setContentType = _otherTypeCache.GetOrAdd(orset.TypeInfo.TypeName, static tn=> Type.GetType(tn));
+ return SerDeserGenericCache.ToGenericORSet(setContentType, this,
+ orset, vector);
+ //var eOther = orset.OtherElements.Zip(dots,
+ // (i, versionVector) => (_ser.OtherMessageFromProto(i), versionVector))
+ // .ToImmutableDictionary(x => x.Item1, x => x.versionVector);
+ //var setType = ORSetMaker.MakeGenericMethod(setContentType);
+ //return (IORSet)setType.Invoke(this, new object[] { eOther, vector });
}
private static readonly MethodInfo ORSetMaker =
- typeof(ReplicatedDataSerializer).GetMethod(nameof(ToGenericORSet), BindingFlags.Static | BindingFlags.NonPublic);
-
- private static ORSet ToGenericORSet(ImmutableDictionary