Skip to content

Commit

Permalink
Fix: ProducerControllerImpl now respects bounds when chunking large…
Browse files Browse the repository at this point in the history
… messages (#6755)

* close #6754 - fix chunking bounds check

Error was caused by requesting a chunkSize greater than the remaining bytes. Also - changed methods so we no longer copy memory during chunking.

* avoid more copying

* Update ProducerControllerSpec.cs
  • Loading branch information
Aaronontheweb authored May 10, 2023
1 parent b68ba56 commit bf9c1ce
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
44 changes: 43 additions & 1 deletion src/core/Akka.Tests/Delivery/ProducerControllerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -453,4 +454,45 @@ public async Task
producerController.Tell(new ProducerController.Request(5L, 15L, false, false));
await replyTo.ExpectMsgAsync(5L);
}
}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6754
/// </summary>
[Fact]
public async Task Repro6754()
{
NextId();
var consumerControllerProbe = CreateTestProbe();

var msg = new string('*', 10_000); // 10k char length string
var producerController =
Sys.ActorOf(
ProducerController.Create<Job>(Sys, ProducerId, Option<Props>.None,
ProducerController.Settings.Create(Sys) with { ChunkLargeMessagesBytes = 1024}),
$"producerController-{_idCount}");

var producerProbe = CreateTestProbe();
producerController.Tell(new ProducerController.Start<Job>(producerProbe.Ref));

producerController.Tell(new ProducerController.RegisterConsumer<Job>(consumerControllerProbe.Ref));

(await producerProbe.ExpectMsgAsync<ProducerController.RequestNext<Job>>())
.SendNextTo.Tell(new Job(msg));
var seqMsg1 = await consumerControllerProbe.ExpectMsgAsync<ConsumerController.SequencedMessage<Job>>();
seqMsg1.Message.IsMessage.Should().BeFalse();
seqMsg1.Message.Chunk.HasValue.Should().BeTrue();
seqMsg1.IsFirstChunk.Should().BeTrue();
seqMsg1.IsLastChunk.Should().BeFalse();
seqMsg1.SeqNr.Should().Be(1);

producerController.Tell(new ProducerController.Request(0L, 10L, true, false));

var seqMsg2 = await consumerControllerProbe.ExpectMsgAsync<ConsumerController.SequencedMessage<Job>>();
seqMsg2.Message.IsMessage.Should().BeFalse();
seqMsg2.Message.Chunk.HasValue.Should().BeTrue();
seqMsg2.IsFirstChunk.Should().BeFalse();
seqMsg2.IsLastChunk.Should().BeFalse();
seqMsg2.SeqNr.Should().Be(2);

}
}
5 changes: 3 additions & 2 deletions src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ internal static IEnumerable<ChunkedMessage> CreateChunks(T msg, int chunkSize,
var bytes = serialization.Serialize(msg);
if (bytes.Length <= chunkSize)
{
var chunkedMessage = new ChunkedMessage(ByteString.CopyFrom(bytes), true, true, serializerId, manifest);
var chunkedMessage = new ChunkedMessage(ByteString.FromBytes(bytes), true, true, serializerId, manifest);
yield return chunkedMessage;
}
else
Expand All @@ -757,7 +757,8 @@ internal static IEnumerable<ChunkedMessage> CreateChunks(T msg, int chunkSize,
for (var i = 0; i < chunkCount; i++)
{
var isLast = i == chunkCount - 1;
var chunkedMessage = new ChunkedMessage(ByteString.CopyFrom(bytes, i * chunkSize, chunkSize), first,
var nextChunk = Math.Min(chunkSize, bytes.Length - i * chunkSize); // needs to be the next chunkSize or remaining bytes, whichever is smaller.
var chunkedMessage = new ChunkedMessage(ByteString.FromBytes(bytes, i * chunkSize, nextChunk), first,
isLast, serializerId, manifest);

first = false;
Expand Down

0 comments on commit bf9c1ce

Please sign in to comment.