Skip to content

Commit

Permalink
Fix ByteString.Copy when requested length is zero (#6749)
Browse files Browse the repository at this point in the history
* added reproduction for #6748

* fixed zero-copy issues
  • Loading branch information
Aaronontheweb authored May 10, 2023
1 parent 15ab3b1 commit d783933
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
26 changes: 25 additions & 1 deletion src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ConsumerControllerSpecs : TestKit.Xunit2.TestKit
}";

public ConsumerControllerSpecs(ITestOutputHelper outputHelper) : base(
Config.WithFallback(TestSerializer.Config), output: outputHelper)
Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper)
{
}

Expand Down Expand Up @@ -648,4 +648,28 @@ public async Task ConsumerController_without_resending_must_accept_lost_message(
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>()).SeqNr.Should().Be(35);
consumerController.Tell(ConsumerController.Confirmed.Instance);
}

/// <summary>
/// Reproduction for https://github.com/akkadotnet/akka.net/issues/6748
/// </summary>
[Fact]
public async Task ConsumerController_can_process_zero_length_Chunk()
{
NextId();
var consumerController = Sys.ActorOf(ConsumerController.Create<ZeroLengthSerializer.TestMsg>(Sys, Option<IActorRef>.None),
$"consumerController-{_idCount}");
var producerControllerProbe = CreateTestProbe();

var consumerProbe = CreateTestProbe();
consumerController.Tell(new ConsumerController.Start<ZeroLengthSerializer.TestMsg>(consumerProbe));

// one chunk for each letter, "123" is 3 chunks
var chunks1 =
ProducerController<ZeroLengthSerializer.TestMsg>.CreateChunks(ZeroLengthSerializer.TestMsg.Instance, chunkSize: 1, Sys.Serialization);
var seqMessages1 = chunks1.Select((c, i) =>
ConsumerController.SequencedMessage<ZeroLengthSerializer.TestMsg>.FromChunkedMessage(ProducerId, 1 + i, c, i == 0, false,
producerControllerProbe)).ToList();
consumerController.Tell(seqMessages1.First());
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<ZeroLengthSerializer.TestMsg>>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance);
}
}
64 changes: 64 additions & 0 deletions src/core/Akka.Tests/Delivery/TestConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,70 @@ public static Props PropsFor(TimeSpan delay, Func<SomeAsyncJob, bool> endConditi
public ITimerScheduler Timers { get; set; } = null!;
}

/// <summary>
/// For testing purposes
/// </summary>
public sealed class ZeroLengthSerializer : SerializerWithStringManifest
{
public static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.actor {
serializers {
delivery-zero-length = ""Akka.Tests.Delivery.ZeroLengthSerializer, Akka.Tests""
}
serialization-bindings {
""Akka.Tests.Delivery.ZeroLengthSerializer+TestMsg, Akka.Tests"" = delivery-zero-length
}
}");

public class TestMsg
{
private TestMsg()
{
}
public static readonly TestMsg Instance = new();
}

public ZeroLengthSerializer(ExtendedActorSystem system) : base(system)
{
}

public override byte[] ToBinary(object obj)
{
switch (obj)
{
case TestMsg _:
return Array.Empty<byte>();
default:
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}]");
}
}

public override object FromBinary(byte[] bytes, string manifest)
{
switch (manifest)
{
case "A":
return TestMsg.Instance;
default:
throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in [{GetType()}]");
}

}

public override string Manifest(object obj)
{
switch (obj)
{
case TestMsg _:
return "A";
default:
throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}]");
}
}

public override int Identifier => 919191;
}

/// <summary>
/// INTERNAL API
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka/Util/ByteString.cs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ public ByteString Concat(ByteString other)
/// <returns>TBD</returns>
public int CopyTo(byte[] buffer, int index, int count)
{
if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to.");
if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count));
Expand Down Expand Up @@ -575,6 +576,7 @@ public int CopyTo(ref Memory<byte> buffer)
/// <returns>The number of bytes copied</returns>
public int CopyTo(ref Memory<byte> buffer, int index, int count)
{
if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy
if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to.");
if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count));

Expand Down Expand Up @@ -613,6 +615,7 @@ public int CopyTo(ref Span<byte> buffer)
/// <returns>The number of bytes copied</returns>
public int CopyTo(ref Span<byte> buffer, int index, int count)
{
if(buffer.Length == 0 && count == 0) return 0; // edge case for no-copy
if (index < 0 || index >= buffer.Length) throw new ArgumentOutOfRangeException(nameof(index), "Provided index is outside the bounds of the buffer to copy to.");
if (count > buffer.Length - index) throw new ArgumentException("Provided number of bytes to copy won't fit into provided buffer", nameof(count));

Expand Down

0 comments on commit d783933

Please sign in to comment.