diff --git a/src/core/Akka.Tests/Delivery/ProducerControllerSpec.cs b/src/core/Akka.Tests/Delivery/ProducerControllerSpec.cs index 4146658cf66..c39a1a79ac0 100644 --- a/src/core/Akka.Tests/Delivery/ProducerControllerSpec.cs +++ b/src/core/Akka.Tests/Delivery/ProducerControllerSpec.cs @@ -6,6 +6,7 @@ // ----------------------------------------------------------------------- using System; +using System.Linq; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; @@ -453,4 +454,45 @@ public async Task producerController.Tell(new ProducerController.Request(5L, 15L, false, false)); await replyTo.ExpectMsgAsync(5L); } -} \ No newline at end of file + + /// + /// Reproduction for https://github.com/akkadotnet/akka.net/issues/6754 + /// + [Fact] + public async Task Repro6754() + { + NextId(); + var consumerControllerProbe = CreateTestProbe(); + + var msg = new string('*', 10_000); // 10k char length string + var producerController = + Sys.ActorOf( + ProducerController.Create(Sys, ProducerId, Option.None, + ProducerController.Settings.Create(Sys) with { ChunkLargeMessagesBytes = 1024}), + $"producerController-{_idCount}"); + + var producerProbe = CreateTestProbe(); + producerController.Tell(new ProducerController.Start(producerProbe.Ref)); + + producerController.Tell(new ProducerController.RegisterConsumer(consumerControllerProbe.Ref)); + + (await producerProbe.ExpectMsgAsync>()) + .SendNextTo.Tell(new Job(msg)); + var seqMsg1 = await consumerControllerProbe.ExpectMsgAsync>(); + seqMsg1.Message.IsMessage.Should().BeFalse(); + seqMsg1.Message.Chunk.HasValue.Should().BeTrue(); + seqMsg1.IsFirstChunk.Should().BeTrue(); + seqMsg1.IsLastChunk.Should().BeFalse(); + seqMsg1.SeqNr.Should().Be(1); + + producerController.Tell(new ProducerController.Request(0L, 10L, true, false)); + + var seqMsg2 = await consumerControllerProbe.ExpectMsgAsync>(); + seqMsg2.Message.IsMessage.Should().BeFalse(); + seqMsg2.Message.Chunk.HasValue.Should().BeTrue(); + seqMsg2.IsFirstChunk.Should().BeFalse(); + seqMsg2.IsLastChunk.Should().BeFalse(); + seqMsg2.SeqNr.Should().Be(2); + + } +} diff --git a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs index 12a5a904da7..9403e2a5cc6 100644 --- a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs @@ -747,7 +747,7 @@ internal static IEnumerable CreateChunks(T msg, int chunkSize, var bytes = serialization.Serialize(msg); if (bytes.Length <= chunkSize) { - var chunkedMessage = new ChunkedMessage(ByteString.CopyFrom(bytes), true, true, serializerId, manifest); + var chunkedMessage = new ChunkedMessage(ByteString.FromBytes(bytes), true, true, serializerId, manifest); yield return chunkedMessage; } else @@ -757,7 +757,8 @@ internal static IEnumerable CreateChunks(T msg, int chunkSize, for (var i = 0; i < chunkCount; i++) { var isLast = i == chunkCount - 1; - var chunkedMessage = new ChunkedMessage(ByteString.CopyFrom(bytes, i * chunkSize, chunkSize), first, + var nextChunk = Math.Min(chunkSize, bytes.Length - i * chunkSize); // needs to be the next chunkSize or remaining bytes, whichever is smaller. + var chunkedMessage = new ChunkedMessage(ByteString.FromBytes(bytes, i * chunkSize, nextChunk), first, isLast, serializerId, manifest); first = false;