Skip to content

Commit

Permalink
Eliminate byte array allocation (#185)
Browse files Browse the repository at this point in the history
* Eliminate byte array allocation

* fixes
  • Loading branch information
sungam3r authored Mar 13, 2024
1 parent 7e822e8 commit e3fa4d1
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/Serilog.Sinks.RabbitMQ/Serilog.Sinks.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.3" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" AllowedVersions="[6.8.1,7.0)" />
<PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.Formatting.Compact" Version="2.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal interface IRabbitMQClient : IDisposable
/// </summary>
/// <param name="message">Message text.</param>
/// <param name="routingKey">Optional routing key.</param>
void Publish(string message, string? routingKey = null);
void Publish(ReadOnlyMemory<byte> message, string? routingKey = null);

/// <summary>
/// Close the connection and all channels to RabbitMQ.
Expand Down
4 changes: 2 additions & 2 deletions src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ internal RabbitMQClient(
_publicationAddress = new PublicationAddress(configuration.ExchangeType, configuration.Exchange, configuration.RouteKey);
}

public void Publish(string message, string? routingKey = null)
public void Publish(ReadOnlyMemory<byte> message, string? routingKey = null)
{
IRabbitMQChannel? channel = null;
try
Expand All @@ -79,7 +79,7 @@ public void Publish(string message, string? routingKey = null)
var address = routingKey == null
? _publicationAddress
: new PublicationAddress(_publicationAddress.ExchangeType, _publicationAddress.ExchangeName, routingKey);
channel.BasicPublish(address, System.Text.Encoding.UTF8.GetBytes(message));
channel.BasicPublish(address, message);
}
finally
{
Expand Down
18 changes: 14 additions & 4 deletions src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Buffers;
using System.Text;
using Microsoft.IO;
using Serilog.Core;
using Serilog.Debugging;
using Serilog.Events;
Expand All @@ -25,6 +28,9 @@ namespace Serilog.Sinks.RabbitMQ;
/// </summary>
public sealed class RabbitMQSink : IBatchedLogEventSink, ILogEventSink, IDisposable
{
private static readonly RecyclableMemoryStreamManager _manager = new();
private static readonly Encoding _utf8NoBOM = new UTF8Encoding(false);

private readonly ITextFormatter _formatter;
private readonly IRabbitMQClient _client;
private readonly ILogEventSink? _failureSink;
Expand Down Expand Up @@ -64,9 +70,11 @@ internal RabbitMQSink(
/// <inheritdoc cref="ILogEventSink.Emit" />
public void Emit(LogEvent logEvent)
{
var sw = new StringWriter();
using var stream = _manager.GetStream();
using var sw = new StreamWriter(stream, _utf8NoBOM);
_formatter.Format(logEvent, sw);
_client.Publish(sw.ToString(), _routeKeyFunction?.Invoke(logEvent));
sw.Flush();
_client.Publish(new ReadOnlyMemory<byte>(stream.GetBuffer(), 0, (int)stream.Length), _routeKeyFunction?.Invoke(logEvent));
}

/// <inheritdoc cref="IBatchedLogEventSink.EmitBatchAsync" />
Expand All @@ -79,9 +87,11 @@ public Task EmitBatchAsync(IEnumerable<LogEvent> batch)
{
foreach (var logEvent in logEvents)
{
var sw = new StringWriter();
using var stream = _manager.GetStream();
using var sw = new StreamWriter(stream, _utf8NoBOM);
_formatter.Format(logEvent, sw);
_client.Publish(sw.ToString(), _routeKeyFunction?.Invoke(logEvent));
sw.Flush();
_client.Publish(new ReadOnlyMemory<byte>(stream.GetBuffer(), 0, (int)stream.Length), _routeKeyFunction?.Invoke(logEvent));
}
}
catch (Exception exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void Dispose()
_rabbitMQClient.Dispose();
}

public void Publish(string message) => _rabbitMQClient.Publish(message);
public void Publish(string message) => _rabbitMQClient.Publish(Encoding.UTF8.GetBytes(message));

// The IModel is not disposed automatically, so the calling member is responsible for disposing it.
public async Task<IModel> GetConsumingModelAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public async Task AutoCreateExchange_WhenTrue_ThenShouldCreateExchange()
};

using var rabbitMQClient = new RabbitMQClient(rabbitMQClientConfiguration);
rabbitMQClient.Publish("a message");
rabbitMQClient.Publish(Encoding.UTF8.GetBytes("a message"));

//// wait for message sent
// await Task.Delay(1000);
Expand Down Expand Up @@ -158,7 +158,7 @@ public async Task Publish_ParallelMessages_AllMessagesArePublished()
{
for (int i = 0; i < 1000; i++)
{
rabbitMQClient.Publish(message);
rabbitMQClient.Publish(Encoding.UTF8.GetBytes(message));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void Publish_ShouldCreateAndReturnChannelToPool()
var sut = new RabbitMQClient(rabbitMQClientConfiguration, rabbitMQConnectionFactory, rabbitMQChannelObjectPoolPolicy);

// Act
sut.Publish("some-message");
sut.Publish(Encoding.UTF8.GetBytes("some-message"));

// Assert
rabbitMQChannelObjectPoolPolicy.Received(1).Create();
Expand Down Expand Up @@ -118,7 +118,7 @@ public void Dispose_ShouldDisposeConnectionAndChannel()
var sut = new RabbitMQClient(rabbitMQClientConfiguration, rabbitMQConnectionFactory, rabbitMQChannelObjectPoolPolicy);

// Need to publish a message first to create the channel in the Pool
sut.Publish("some-message");
sut.Publish(Encoding.UTF8.GetBytes("some-message"));

// Act
sut.Dispose();
Expand Down
39 changes: 28 additions & 11 deletions tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ namespace Serilog.Sinks.RabbitMQ.Tests.RabbitMQ;

public class RabbitMQSinkTests
{
private sealed class StubClient : IRabbitMQClient
{
public void Close() => throw new NotImplementedException();

public void Dispose() => throw new NotImplementedException();

public void Publish(ReadOnlyMemory<byte> message, string? routingKey = null)
{
// Need to be stored as string because underlying array of ReadOnlyMemory is reused.
Messages.Add(Encoding.UTF8.GetString(message.ToArray()));
}

public List<string> Messages { get; set; } = [];
}

[Fact]
public void Emit_ShouldPublishMessages()
{
Expand All @@ -20,15 +35,16 @@ public void Emit_ShouldPublishMessages()
.When(x => x.Format(Arg.Any<LogEvent>(), Arg.Any<TextWriter>()))
.Do(x => x.Arg<TextWriter>().Write(x.Arg<LogEvent>().MessageTemplate.Text));

var rabbitMQClient = Substitute.For<IRabbitMQClient>();
var rabbitMQClient = new StubClient();

var sut = new RabbitMQSink(rabbitMQClient, textFormatter);

// Act
sut.Emit(logEvent);

// Assert
rabbitMQClient.Received(1).Publish(Arg.Is("some-message"));
rabbitMQClient.Messages.Count.ShouldBe(1);
rabbitMQClient.Messages[0].ShouldBe("some-message");
}

[Fact]
Expand All @@ -44,16 +60,17 @@ public async Task EmitBatchAsync_ShouldPublishMessages()
.When(x => x.Format(Arg.Any<LogEvent>(), Arg.Any<TextWriter>()))
.Do(x => x.Arg<TextWriter>().Write(x.Arg<LogEvent>().MessageTemplate.Text));

var rabbitMQClient = Substitute.For<IRabbitMQClient>();
var rabbitMQClient = new StubClient();

var sut = new RabbitMQSink(rabbitMQClient, textFormatter);

// Act
await sut.EmitBatchAsync(logEvents);

// Assert
rabbitMQClient.Received(1).Publish(Arg.Is("some-message-1"));
rabbitMQClient.Received(1).Publish(Arg.Is("some-message-2"));
rabbitMQClient.Messages.Count.ShouldBe(2);
rabbitMQClient.Messages[0].ShouldBe("some-message-1");
rabbitMQClient.Messages[1].ShouldBe("some-message-2");
}

[Fact]
Expand All @@ -71,7 +88,7 @@ public async Task EmitBatchAsync_ShouldDoNothing_WhenNoEventsAreEmitted()
await sut.EmitBatchAsync(logEvents);

// Assert
rabbitMQClient.DidNotReceive().Publish(Arg.Any<string>());
rabbitMQClient.DidNotReceive().Publish(Arg.Any<ReadOnlyMemory<byte>>());
}

[Fact]
Expand Down Expand Up @@ -149,7 +166,7 @@ public async Task EmitBatchAsync_ShouldWriteAllEventsToFailureSink_WhenPublishTh
// Arrange
var textFormatter = Substitute.For<ITextFormatter>();
var rabbitMQClient = Substitute.For<IRabbitMQClient>();
rabbitMQClient.When(x => x.Publish(Arg.Any<string>()))
rabbitMQClient.When(x => x.Publish(Arg.Any<ReadOnlyMemory<byte>>()))
.Do(_ => throw new Exception("some-message"));

var failureSink = Substitute.For<ILogEventSink>();
Expand All @@ -175,7 +192,7 @@ public async Task EmitBatchAsync_ShouldWriteExceptionToSelfLog_WhenPublishThrows

var textFormatter = Substitute.For<ITextFormatter>();
var rabbitMQClient = Substitute.For<IRabbitMQClient>();
rabbitMQClient.When(x => x.Publish(Arg.Any<string>()))
rabbitMQClient.When(x => x.Publish(Arg.Any<ReadOnlyMemory<byte>>()))
.Do(_ => throw new Exception("some-message"));

var failureSink = Substitute.For<ILogEventSink>();
Expand All @@ -201,7 +218,7 @@ public async Task EmitBatchAsync_ShouldWriteExceptionsToSelfLog_WhenFailureSinkT

var textFormatter = Substitute.For<ITextFormatter>();
var rabbitMQClient = Substitute.For<IRabbitMQClient>();
rabbitMQClient.When(x => x.Publish(Arg.Any<string>()))
rabbitMQClient.When(x => x.Publish(Arg.Any<ReadOnlyMemory<byte>>()))
.Do(_ => throw new Exception("some-message"));

var failureSink = Substitute.For<ILogEventSink>();
Expand All @@ -227,7 +244,7 @@ public async Task EmitBatchAsync_ShouldThrowException_WhenPublishThrowsException
// Arrange
var textFormatter = Substitute.For<ITextFormatter>();
var rabbitMQClient = Substitute.For<IRabbitMQClient>();
rabbitMQClient.When(x => x.Publish(Arg.Any<string>()))
rabbitMQClient.When(x => x.Publish(Arg.Any<ReadOnlyMemory<byte>>()))
.Do(_ => throw new Exception("some-message"));

var failureSink = Substitute.For<ILogEventSink>();
Expand All @@ -249,7 +266,7 @@ public async Task EmitBatchAsync_ShouldNotThrowException_WhenPublishThrowsExcept
// Arrange
var textFormatter = Substitute.For<ITextFormatter>();
var rabbitMQClient = Substitute.For<IRabbitMQClient>();
rabbitMQClient.When(x => x.Publish(Arg.Any<string>()))
rabbitMQClient.When(x => x.Publish(Arg.Any<ReadOnlyMemory<byte>>()))
.Do(_ => throw new Exception("some-message"));

var failureSink = Substitute.For<ILogEventSink>();
Expand Down

0 comments on commit e3fa4d1

Please sign in to comment.