diff --git a/src/Serilog.Sinks.RabbitMQ/Serilog.Sinks.RabbitMQ.csproj b/src/Serilog.Sinks.RabbitMQ/Serilog.Sinks.RabbitMQ.csproj index 0663f29..bf9c2c7 100644 --- a/src/Serilog.Sinks.RabbitMQ/Serilog.Sinks.RabbitMQ.csproj +++ b/src/Serilog.Sinks.RabbitMQ/Serilog.Sinks.RabbitMQ.csproj @@ -13,6 +13,7 @@ + diff --git a/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/IRabbitMQClient.cs b/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/IRabbitMQClient.cs index 52965a5..ba57a67 100644 --- a/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/IRabbitMQClient.cs +++ b/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/IRabbitMQClient.cs @@ -24,7 +24,7 @@ internal interface IRabbitMQClient : IDisposable /// /// Message text. /// Optional routing key. - void Publish(string message, string? routingKey = null); + void Publish(ReadOnlyMemory message, string? routingKey = null); /// /// Close the connection and all channels to RabbitMQ. diff --git a/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQClient.cs b/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQClient.cs index 1a1fad4..cda091b 100644 --- a/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQClient.cs +++ b/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQClient.cs @@ -70,7 +70,7 @@ internal RabbitMQClient( _publicationAddress = new PublicationAddress(configuration.ExchangeType, configuration.Exchange, configuration.RouteKey); } - public void Publish(string message, string? routingKey = null) + public void Publish(ReadOnlyMemory message, string? routingKey = null) { IRabbitMQChannel? channel = null; try @@ -79,7 +79,7 @@ public void Publish(string message, string? routingKey = null) var address = routingKey == null ? _publicationAddress : new PublicationAddress(_publicationAddress.ExchangeType, _publicationAddress.ExchangeName, routingKey); - channel.BasicPublish(address, System.Text.Encoding.UTF8.GetBytes(message)); + channel.BasicPublish(address, message); } finally { diff --git a/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQSink.cs b/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQSink.cs index 458eacc..9f89739 100644 --- a/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQSink.cs +++ b/src/Serilog.Sinks.RabbitMQ/Sinks/RabbitMQ/RabbitMQSink.cs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.Buffers; +using System.Text; +using Microsoft.IO; using Serilog.Core; using Serilog.Debugging; using Serilog.Events; @@ -25,6 +28,9 @@ namespace Serilog.Sinks.RabbitMQ; /// public sealed class RabbitMQSink : IBatchedLogEventSink, ILogEventSink, IDisposable { + private static readonly RecyclableMemoryStreamManager _manager = new(); + private static readonly Encoding _utf8NoBOM = new UTF8Encoding(false); + private readonly ITextFormatter _formatter; private readonly IRabbitMQClient _client; private readonly ILogEventSink? _failureSink; @@ -64,9 +70,11 @@ internal RabbitMQSink( /// public void Emit(LogEvent logEvent) { - var sw = new StringWriter(); + using var stream = _manager.GetStream(); + using var sw = new StreamWriter(stream, _utf8NoBOM); _formatter.Format(logEvent, sw); - _client.Publish(sw.ToString(), _routeKeyFunction?.Invoke(logEvent)); + sw.Flush(); + _client.Publish(new ReadOnlyMemory(stream.GetBuffer(), 0, (int)stream.Length), _routeKeyFunction?.Invoke(logEvent)); } /// @@ -79,9 +87,11 @@ public Task EmitBatchAsync(IEnumerable batch) { foreach (var logEvent in logEvents) { - var sw = new StringWriter(); + using var stream = _manager.GetStream(); + using var sw = new StreamWriter(stream, _utf8NoBOM); _formatter.Format(logEvent, sw); - _client.Publish(sw.ToString(), _routeKeyFunction?.Invoke(logEvent)); + sw.Flush(); + _client.Publish(new ReadOnlyMemory(stream.GetBuffer(), 0, (int)stream.Length), _routeKeyFunction?.Invoke(logEvent)); } } catch (Exception exception) diff --git a/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMQFixture.cs b/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMQFixture.cs index 1e42435..5c8410d 100644 --- a/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMQFixture.cs +++ b/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMQFixture.cs @@ -135,7 +135,7 @@ public void Dispose() _rabbitMQClient.Dispose(); } - public void Publish(string message) => _rabbitMQClient.Publish(message); + public void Publish(string message) => _rabbitMQClient.Publish(Encoding.UTF8.GetBytes(message)); // The IModel is not disposed automatically, so the calling member is responsible for disposing it. public async Task GetConsumingModelAsync() diff --git a/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMqClientTest.cs b/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMqClientTest.cs index 5e6146e..dcd99a1 100644 --- a/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMqClientTest.cs +++ b/tests/Serilog.Sinks.RabbitMQ.Tests.Integration/RabbitMqClientTest.cs @@ -110,7 +110,7 @@ public async Task AutoCreateExchange_WhenTrue_ThenShouldCreateExchange() }; using var rabbitMQClient = new RabbitMQClient(rabbitMQClientConfiguration); - rabbitMQClient.Publish("a message"); + rabbitMQClient.Publish(Encoding.UTF8.GetBytes("a message")); //// wait for message sent // await Task.Delay(1000); @@ -158,7 +158,7 @@ public async Task Publish_ParallelMessages_AllMessagesArePublished() { for (int i = 0; i < 1000; i++) { - rabbitMQClient.Publish(message); + rabbitMQClient.Publish(Encoding.UTF8.GetBytes(message)); } }); diff --git a/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQClientTests.cs b/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQClientTests.cs index b2312f0..045d440 100644 --- a/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQClientTests.cs +++ b/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQClientTests.cs @@ -38,7 +38,7 @@ public void Publish_ShouldCreateAndReturnChannelToPool() var sut = new RabbitMQClient(rabbitMQClientConfiguration, rabbitMQConnectionFactory, rabbitMQChannelObjectPoolPolicy); // Act - sut.Publish("some-message"); + sut.Publish(Encoding.UTF8.GetBytes("some-message")); // Assert rabbitMQChannelObjectPoolPolicy.Received(1).Create(); @@ -118,7 +118,7 @@ public void Dispose_ShouldDisposeConnectionAndChannel() var sut = new RabbitMQClient(rabbitMQClientConfiguration, rabbitMQConnectionFactory, rabbitMQChannelObjectPoolPolicy); // Need to publish a message first to create the channel in the Pool - sut.Publish("some-message"); + sut.Publish(Encoding.UTF8.GetBytes("some-message")); // Act sut.Dispose(); diff --git a/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQSinkTests.cs b/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQSinkTests.cs index faa2cab..d75c029 100644 --- a/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQSinkTests.cs +++ b/tests/Serilog.Sinks.RabbitMQ.Tests/RabbitMQ/RabbitMQSinkTests.cs @@ -9,6 +9,21 @@ namespace Serilog.Sinks.RabbitMQ.Tests.RabbitMQ; public class RabbitMQSinkTests { + private sealed class StubClient : IRabbitMQClient + { + public void Close() => throw new NotImplementedException(); + + public void Dispose() => throw new NotImplementedException(); + + public void Publish(ReadOnlyMemory message, string? routingKey = null) + { + // Need to be stored as string because underlying array of ReadOnlyMemory is reused. + Messages.Add(Encoding.UTF8.GetString(message.ToArray())); + } + + public List Messages { get; set; } = []; + } + [Fact] public void Emit_ShouldPublishMessages() { @@ -20,7 +35,7 @@ public void Emit_ShouldPublishMessages() .When(x => x.Format(Arg.Any(), Arg.Any())) .Do(x => x.Arg().Write(x.Arg().MessageTemplate.Text)); - var rabbitMQClient = Substitute.For(); + var rabbitMQClient = new StubClient(); var sut = new RabbitMQSink(rabbitMQClient, textFormatter); @@ -28,7 +43,8 @@ public void Emit_ShouldPublishMessages() sut.Emit(logEvent); // Assert - rabbitMQClient.Received(1).Publish(Arg.Is("some-message")); + rabbitMQClient.Messages.Count.ShouldBe(1); + rabbitMQClient.Messages[0].ShouldBe("some-message"); } [Fact] @@ -44,7 +60,7 @@ public async Task EmitBatchAsync_ShouldPublishMessages() .When(x => x.Format(Arg.Any(), Arg.Any())) .Do(x => x.Arg().Write(x.Arg().MessageTemplate.Text)); - var rabbitMQClient = Substitute.For(); + var rabbitMQClient = new StubClient(); var sut = new RabbitMQSink(rabbitMQClient, textFormatter); @@ -52,8 +68,9 @@ public async Task EmitBatchAsync_ShouldPublishMessages() await sut.EmitBatchAsync(logEvents); // Assert - rabbitMQClient.Received(1).Publish(Arg.Is("some-message-1")); - rabbitMQClient.Received(1).Publish(Arg.Is("some-message-2")); + rabbitMQClient.Messages.Count.ShouldBe(2); + rabbitMQClient.Messages[0].ShouldBe("some-message-1"); + rabbitMQClient.Messages[1].ShouldBe("some-message-2"); } [Fact] @@ -71,7 +88,7 @@ public async Task EmitBatchAsync_ShouldDoNothing_WhenNoEventsAreEmitted() await sut.EmitBatchAsync(logEvents); // Assert - rabbitMQClient.DidNotReceive().Publish(Arg.Any()); + rabbitMQClient.DidNotReceive().Publish(Arg.Any>()); } [Fact] @@ -149,7 +166,7 @@ public async Task EmitBatchAsync_ShouldWriteAllEventsToFailureSink_WhenPublishTh // Arrange var textFormatter = Substitute.For(); var rabbitMQClient = Substitute.For(); - rabbitMQClient.When(x => x.Publish(Arg.Any())) + rabbitMQClient.When(x => x.Publish(Arg.Any>())) .Do(_ => throw new Exception("some-message")); var failureSink = Substitute.For(); @@ -175,7 +192,7 @@ public async Task EmitBatchAsync_ShouldWriteExceptionToSelfLog_WhenPublishThrows var textFormatter = Substitute.For(); var rabbitMQClient = Substitute.For(); - rabbitMQClient.When(x => x.Publish(Arg.Any())) + rabbitMQClient.When(x => x.Publish(Arg.Any>())) .Do(_ => throw new Exception("some-message")); var failureSink = Substitute.For(); @@ -201,7 +218,7 @@ public async Task EmitBatchAsync_ShouldWriteExceptionsToSelfLog_WhenFailureSinkT var textFormatter = Substitute.For(); var rabbitMQClient = Substitute.For(); - rabbitMQClient.When(x => x.Publish(Arg.Any())) + rabbitMQClient.When(x => x.Publish(Arg.Any>())) .Do(_ => throw new Exception("some-message")); var failureSink = Substitute.For(); @@ -227,7 +244,7 @@ public async Task EmitBatchAsync_ShouldThrowException_WhenPublishThrowsException // Arrange var textFormatter = Substitute.For(); var rabbitMQClient = Substitute.For(); - rabbitMQClient.When(x => x.Publish(Arg.Any())) + rabbitMQClient.When(x => x.Publish(Arg.Any>())) .Do(_ => throw new Exception("some-message")); var failureSink = Substitute.For(); @@ -249,7 +266,7 @@ public async Task EmitBatchAsync_ShouldNotThrowException_WhenPublishThrowsExcept // Arrange var textFormatter = Substitute.For(); var rabbitMQClient = Substitute.For(); - rabbitMQClient.When(x => x.Publish(Arg.Any())) + rabbitMQClient.When(x => x.Publish(Arg.Any>())) .Do(_ => throw new Exception("some-message")); var failureSink = Substitute.For();