From 9979bb8dae17bb0ac3814072e49c6da8c6f74069 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 30 Aug 2024 15:00:01 +0900 Subject: [PATCH 01/16] Use pre-allocated buffer instead of using ArrayPool --- .../Buffers/ArrayPoolBufferWriter.cs | 40 ++++++++++--------- .../Buffers/ArrayPoolBufferWriter.cs | 40 ++++++++++--------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs index 8c5806321..61a63c6ad 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs @@ -1,6 +1,8 @@ using System; using System.Buffers; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; namespace MagicOnion.Internal.Buffers { @@ -11,11 +13,7 @@ internal sealed class ArrayPoolBufferWriter : IBufferWriter, IDisposable public static ArrayPoolBufferWriter RentThreadStaticWriter() { - if (staticInstance == null) - { - staticInstance = new ArrayPoolBufferWriter(); - } - staticInstance.Prepare(); + (staticInstance ??= new ArrayPoolBufferWriter()).Prepare(); #if DEBUG var currentInstance = staticInstance; @@ -26,17 +24,17 @@ public static ArrayPoolBufferWriter RentThreadStaticWriter() #endif } - const int MinimumBufferSize = 32767; // use 32k buffer. + const int PreAllocatedBufferSize = 8192; // use 8k buffer. + const int MinimumBufferSize = PreAllocatedBufferSize / 2; + readonly byte[] preAllocatedBuffer = new byte[PreAllocatedBufferSize]; byte[]? buffer; int index; + [MethodImpl(MethodImplOptions.AggressiveInlining)] void Prepare() { - if (buffer == null) - { - buffer = ArrayPool.Shared.Rent(MinimumBufferSize); - } + buffer = preAllocatedBuffer; index = 0; } @@ -49,24 +47,28 @@ void Prepare() public int FreeCapacity => Capacity - index; + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Advance(int count) { if (count < 0) throw new ArgumentException(nameof(count)); index += count; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public Memory GetMemory(int sizeHint = 0) { CheckAndResizeBuffer(sizeHint); return buffer.AsMemory(index); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public Span GetSpan(int sizeHint = 0) { CheckAndResizeBuffer(sizeHint); return buffer.AsSpan(index); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] void CheckAndResizeBuffer(int sizeHint) { if (buffer == null) throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter)); @@ -82,32 +84,34 @@ void CheckAndResizeBuffer(int sizeHint) if (sizeHint > availableSpace) { int growBy = Math.Max(sizeHint, buffer.Length); - int newSize = checked(buffer.Length + growBy); byte[] oldBuffer = buffer; buffer = ArrayPool.Shared.Rent(newSize); + oldBuffer.AsSpan(0, index).CopyTo(buffer); - Span previousBuffer = oldBuffer.AsSpan(0, index); - previousBuffer.CopyTo(buffer); - ArrayPool.Shared.Return(oldBuffer); + if (oldBuffer != preAllocatedBuffer) + { + ArrayPool.Shared.Return(oldBuffer); + } } } public void Dispose() { - if (buffer == null) + if (buffer != preAllocatedBuffer && buffer != null) { - return; + ArrayPool.Shared.Return(buffer); } - - ArrayPool.Shared.Return(buffer); buffer = null; #if DEBUG Debug.Assert(staticInstance is null); staticInstance = this; +#if NETSTANDARD2_1 || NET6_0_OR_GREATER + Array.Fill(preAllocatedBuffer, 0xff); +#endif #endif } } diff --git a/src/MagicOnion.Internal/Buffers/ArrayPoolBufferWriter.cs b/src/MagicOnion.Internal/Buffers/ArrayPoolBufferWriter.cs index 8c5806321..61a63c6ad 100644 --- a/src/MagicOnion.Internal/Buffers/ArrayPoolBufferWriter.cs +++ b/src/MagicOnion.Internal/Buffers/ArrayPoolBufferWriter.cs @@ -1,6 +1,8 @@ using System; using System.Buffers; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; namespace MagicOnion.Internal.Buffers { @@ -11,11 +13,7 @@ internal sealed class ArrayPoolBufferWriter : IBufferWriter, IDisposable public static ArrayPoolBufferWriter RentThreadStaticWriter() { - if (staticInstance == null) - { - staticInstance = new ArrayPoolBufferWriter(); - } - staticInstance.Prepare(); + (staticInstance ??= new ArrayPoolBufferWriter()).Prepare(); #if DEBUG var currentInstance = staticInstance; @@ -26,17 +24,17 @@ public static ArrayPoolBufferWriter RentThreadStaticWriter() #endif } - const int MinimumBufferSize = 32767; // use 32k buffer. + const int PreAllocatedBufferSize = 8192; // use 8k buffer. + const int MinimumBufferSize = PreAllocatedBufferSize / 2; + readonly byte[] preAllocatedBuffer = new byte[PreAllocatedBufferSize]; byte[]? buffer; int index; + [MethodImpl(MethodImplOptions.AggressiveInlining)] void Prepare() { - if (buffer == null) - { - buffer = ArrayPool.Shared.Rent(MinimumBufferSize); - } + buffer = preAllocatedBuffer; index = 0; } @@ -49,24 +47,28 @@ void Prepare() public int FreeCapacity => Capacity - index; + [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Advance(int count) { if (count < 0) throw new ArgumentException(nameof(count)); index += count; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public Memory GetMemory(int sizeHint = 0) { CheckAndResizeBuffer(sizeHint); return buffer.AsMemory(index); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public Span GetSpan(int sizeHint = 0) { CheckAndResizeBuffer(sizeHint); return buffer.AsSpan(index); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] void CheckAndResizeBuffer(int sizeHint) { if (buffer == null) throw new ObjectDisposedException(nameof(ArrayPoolBufferWriter)); @@ -82,32 +84,34 @@ void CheckAndResizeBuffer(int sizeHint) if (sizeHint > availableSpace) { int growBy = Math.Max(sizeHint, buffer.Length); - int newSize = checked(buffer.Length + growBy); byte[] oldBuffer = buffer; buffer = ArrayPool.Shared.Rent(newSize); + oldBuffer.AsSpan(0, index).CopyTo(buffer); - Span previousBuffer = oldBuffer.AsSpan(0, index); - previousBuffer.CopyTo(buffer); - ArrayPool.Shared.Return(oldBuffer); + if (oldBuffer != preAllocatedBuffer) + { + ArrayPool.Shared.Return(oldBuffer); + } } } public void Dispose() { - if (buffer == null) + if (buffer != preAllocatedBuffer && buffer != null) { - return; + ArrayPool.Shared.Return(buffer); } - - ArrayPool.Shared.Return(buffer); buffer = null; #if DEBUG Debug.Assert(staticInstance is null); staticInstance = this; +#if NETSTANDARD2_1 || NET6_0_OR_GREATER + Array.Fill(preAllocatedBuffer, 0xff); +#endif #endif } } From 62532f1a33c320fe1d07b75d0543fd19643904a8 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 30 Aug 2024 15:19:55 +0900 Subject: [PATCH 02/16] Use SingleWriter --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index ef94212ea..33bf866ce 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -41,7 +41,7 @@ public abstract class StreamingHubBase : ServiceBase Group { get; private set; } = default!; From 42d02b0901e23fe61ff71b2aef7b433c6343cb93 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 30 Aug 2024 15:21:12 +0900 Subject: [PATCH 03/16] Use pre-allocated buffer --- .../Internal.Shared/StreamingHubPayload.cs | 36 ++++++++++++++++--- .../StreamingHubPayloadPool.ObjectPool.cs | 2 +- .../StreamingHubPayload.cs | 36 ++++++++++++++++--- .../StreamingHubPayloadPool.ObjectPool.cs | 2 +- 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs index 35582b9dc..f83314d44 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs @@ -74,6 +74,9 @@ internal class StreamingHubPayload : StreamingHubPayloadCore internal class StreamingHubPayloadCore { + const int PreAllocatedBufferSize = 4096; + + readonly byte[] preAllocatedBuffer = new byte[PreAllocatedBufferSize]; byte[]? buffer; ReadOnlyMemory? memory; @@ -89,7 +92,14 @@ public void Initialize(ReadOnlySpan data) { ThrowIfUsing(); - buffer = ArrayPool.Shared.Rent(data.Length); + if (data.Length > preAllocatedBuffer.Length) + { + buffer = ArrayPool.Shared.Rent(data.Length); + } + else + { + buffer = preAllocatedBuffer; + } data.CopyTo(buffer); memory = buffer.AsMemory(0, (int)data.Length); } @@ -99,7 +109,15 @@ public void Initialize(ReadOnlySequence data) ThrowIfUsing(); if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue"); - buffer = ArrayPool.Shared.Rent((int)data.Length); + if (data.Length > preAllocatedBuffer.Length) + { + buffer = ArrayPool.Shared.Rent((int)data.Length); + } + else + { + buffer = preAllocatedBuffer; + } + data.CopyTo(buffer); memory = buffer.AsMemory(0, (int)data.Length); } @@ -115,7 +133,14 @@ public void Initialize(ReadOnlyMemory data, bool holdReference) } else { - buffer = ArrayPool.Shared.Rent((int)data.Length); + if (data.Length > preAllocatedBuffer.Length) + { + buffer = ArrayPool.Shared.Rent((int)data.Length); + } + else + { + buffer = preAllocatedBuffer; + } data.CopyTo(buffer); memory = buffer.AsMemory(0, (int)data.Length); } @@ -130,7 +155,10 @@ public void Uninitialize() #if DEBUG && NET6_0_OR_GREATER Array.Fill(buffer, 0xff); #endif - ArrayPool.Shared.Return(buffer); + if (buffer != preAllocatedBuffer) + { + ArrayPool.Shared.Return(buffer); + } } memory = null; diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs index 1a0948825..9bb807d06 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs @@ -6,7 +6,7 @@ namespace MagicOnion.Internal; internal class StreamingHubPayloadPool { - const int MaximumRetained = 2 << 7; + const int MaximumRetained = 2 << 10; readonly ObjectPool pool = new DefaultObjectPool(new Policy(), MaximumRetained); diff --git a/src/MagicOnion.Internal/StreamingHubPayload.cs b/src/MagicOnion.Internal/StreamingHubPayload.cs index 35582b9dc..f83314d44 100644 --- a/src/MagicOnion.Internal/StreamingHubPayload.cs +++ b/src/MagicOnion.Internal/StreamingHubPayload.cs @@ -74,6 +74,9 @@ internal class StreamingHubPayload : StreamingHubPayloadCore internal class StreamingHubPayloadCore { + const int PreAllocatedBufferSize = 4096; + + readonly byte[] preAllocatedBuffer = new byte[PreAllocatedBufferSize]; byte[]? buffer; ReadOnlyMemory? memory; @@ -89,7 +92,14 @@ public void Initialize(ReadOnlySpan data) { ThrowIfUsing(); - buffer = ArrayPool.Shared.Rent(data.Length); + if (data.Length > preAllocatedBuffer.Length) + { + buffer = ArrayPool.Shared.Rent(data.Length); + } + else + { + buffer = preAllocatedBuffer; + } data.CopyTo(buffer); memory = buffer.AsMemory(0, (int)data.Length); } @@ -99,7 +109,15 @@ public void Initialize(ReadOnlySequence data) ThrowIfUsing(); if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue"); - buffer = ArrayPool.Shared.Rent((int)data.Length); + if (data.Length > preAllocatedBuffer.Length) + { + buffer = ArrayPool.Shared.Rent((int)data.Length); + } + else + { + buffer = preAllocatedBuffer; + } + data.CopyTo(buffer); memory = buffer.AsMemory(0, (int)data.Length); } @@ -115,7 +133,14 @@ public void Initialize(ReadOnlyMemory data, bool holdReference) } else { - buffer = ArrayPool.Shared.Rent((int)data.Length); + if (data.Length > preAllocatedBuffer.Length) + { + buffer = ArrayPool.Shared.Rent((int)data.Length); + } + else + { + buffer = preAllocatedBuffer; + } data.CopyTo(buffer); memory = buffer.AsMemory(0, (int)data.Length); } @@ -130,7 +155,10 @@ public void Uninitialize() #if DEBUG && NET6_0_OR_GREATER Array.Fill(buffer, 0xff); #endif - ArrayPool.Shared.Return(buffer); + if (buffer != preAllocatedBuffer) + { + ArrayPool.Shared.Return(buffer); + } } memory = null; diff --git a/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs index 1a0948825..9bb807d06 100644 --- a/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs +++ b/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs @@ -6,7 +6,7 @@ namespace MagicOnion.Internal; internal class StreamingHubPayloadPool { - const int MaximumRetained = 2 << 7; + const int MaximumRetained = 2 << 10; readonly ObjectPool pool = new DefaultObjectPool(new Policy(), MaximumRetained); From 2ff643f96dae05017552767feb7eb946f52dc2b9 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 30 Aug 2024 15:51:37 +0900 Subject: [PATCH 04/16] Avoid allocation while reading the hub message. --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 33bf866ce..113796e41 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -134,6 +134,7 @@ internal async Task().GetHandlers(Context.MethodHandler); // Starts a loop that consumes the request queue. - var consumeRequestsTask = ConsumeRequestQueueAsync(ct); + var consumeRequestsTask = ConsumeRequestQueueAsync(); // Main loop of StreamingHub. // Be careful to allocation and performance. @@ -184,10 +185,11 @@ async Task HandleMessageAsync() } } - async ValueTask ConsumeRequestQueueAsync(CancellationToken cancellationToken) + async ValueTask ConsumeRequestQueueAsync() { // We need to process client requests sequentially. - await foreach (var request in requests.Reader.ReadAllAsync(cancellationToken)) + // NOTE: Do not pass a CancellationToken to avoid allocation. We call Writer.Complete when we want to stop the consumption loop. + await foreach (var request in requests.Reader.ReadAllAsync(default)) { try { From 27c7b51d3c66ad35159a0d7aff00b1295fb8ebbb Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 2 Sep 2024 12:46:10 +0900 Subject: [PATCH 05/16] Reduce async --- .../StreamingHubServerMessageReader.cs | 21 ++- src/MagicOnion.Server/Hubs/StreamingHub.cs | 135 ++++++++++++------ .../Hubs/StreamingHubContext.cs | 3 +- 3 files changed, 113 insertions(+), 46 deletions(-) diff --git a/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs b/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs index 307bf5016..5281cad95 100644 --- a/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs +++ b/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs @@ -1,4 +1,7 @@ using System; +using System.Data; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using MessagePack; namespace MagicOnion.Internal @@ -14,6 +17,7 @@ public StreamingHubServerMessageReader(ReadOnlyMemory data) this.reader = new MessagePackReader(data); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public StreamingHubMessageType ReadMessageType() { var arrayLength = this.reader.ReadArrayHeader(); @@ -27,12 +31,20 @@ public StreamingHubMessageType ReadMessageType() 0x01 => StreamingHubMessageType.ClientResultResponseWithError, 0x7e => StreamingHubMessageType.ClientHeartbeat, 0x7f => StreamingHubMessageType.ServerHeartbeatResponse, - var subType => throw new InvalidOperationException($"Unknown client response message: {subType}"), + var subType => ThrowUnknownMessageSubType(subType), }, - _ => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"), + _ => ThrowUnknownMessageFormat(arrayLength), }; + + [DoesNotReturn] + static StreamingHubMessageType ThrowUnknownMessageSubType(byte subType) + => throw new InvalidOperationException($"Unknown client response message: {subType}"); + [DoesNotReturn] + static StreamingHubMessageType ThrowUnknownMessageFormat(int arrayLength) + => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (int MethodId, ReadOnlyMemory Body) ReadRequestFireAndForget() { // void: [methodId, [argument]] @@ -42,6 +54,7 @@ public StreamingHubMessageType ReadMessageType() return (methodId, data.Slice(consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (int MessageId, int MethodId, ReadOnlyMemory Body) ReadRequest() { // T: [messageId, methodId, [argument]] @@ -52,6 +65,7 @@ public StreamingHubMessageType ReadMessageType() return (messageId, methodId, data.Slice(consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (Guid ClientResultMessageId, int ClientMethodId, ReadOnlyMemory Body) ReadClientResultResponse() { // T: [0, clientResultMessageId, methodId, result] @@ -62,6 +76,7 @@ public StreamingHubMessageType ReadMessageType() return (clientResultMessageId, clientMethodId, data.Slice(consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (Guid ClientResultMessageId, int ClientMethodId, int StatusCode, string Detail, string Message) ReadClientResultResponseForError() { // T: [1, clientResultMessageId, methodId, [statusCode, detail, message]] @@ -77,6 +92,7 @@ public StreamingHubMessageType ReadMessageType() return (clientResultMessageId, clientMethodId, statusCode, detail, message); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (short Sequence, long ClientSentAt, ReadOnlyMemory Extra) ReadClientHeartbeat() { // [Sequence(int16), ClientSentAt(long), ] @@ -87,6 +103,7 @@ public StreamingHubMessageType ReadMessageType() return (sequence, clientSentAt, data.Slice((int)reader.Consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public short ReadServerHeartbeatResponse() { // [Sequence(int16), Nil, Nil] diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 113796e41..ac199f36d 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -1,4 +1,6 @@ using System.Buffers; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading.Channels; using Cysharp.Runtime.Multicast.Remoting; @@ -255,66 +257,115 @@ ValueTask ProcessMessageAsync(StreamingHubPayload payload, UniqueHashDictionary< return default; } default: - throw new InvalidOperationException($"Unknown MessageType: {messageType}"); + ThrowUnknownMessageType(messageType); + return default; } + + [DoesNotReturn] + static void ThrowUnknownMessageType(StreamingHubMessageType messageType) + => throw new InvalidOperationException($"Unknown MessageType: {messageType}"); } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask ProcessRequestAsync(UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) + ValueTask ProcessRequestAsync(UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) { - if (handlers.TryGetValue(methodId, out var handler)) + var handler = GetOrThrowHandler(handlers, methodId); + + // Create a context for each call to the hub method. + var context = StreamingHubContextPool.Shared.Get(); + context.Initialize( + handler: handler, + streamingServiceContext: (IStreamingServiceContext)Context, + hubInstance: this, + request: body, + messageId: messageId, + timestamp: timeProvider.GetUtcNow().UtcDateTime + ); + + var methodStartingTimestamp = timeProvider.GetTimestamp(); + var isErrorOrInterrupted = false; + var hasCompletedSynchronously = true; + MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, context, context.Request, handler.RequestType); + try { - // Create a context for each call to the hub method. - var context = StreamingHubContextPool.Shared.Get(); - context.Initialize( - handler: handler, - streamingServiceContext: (IStreamingServiceContext)Context, - hubInstance: this, - request: body, - messageId: messageId, - timestamp: timeProvider.GetUtcNow().UtcDateTime - ); - - var methodStartingTimestamp = timeProvider.GetTimestamp(); - var isErrorOrInterrupted = false; - MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, context, context.Request, handler.RequestType); - try + var resultTask = handler.MethodBody.Invoke(context); + if (!resultTask.IsCompletedSuccessfully) { - await handler.MethodBody.Invoke(context); + hasCompletedSynchronously = false; + return ProcessRequestAsyncAwait(resultTask, context, handler, methodStartingTimestamp, hasResponse); } - catch (ReturnStatusException ex) + } + catch (Exception ex) + { + HandleException(context, handler, ex, hasResponse); + } + finally + { + if (hasCompletedSynchronously) { - if (hasResponse) - { - await context.WriteErrorMessage((int)ex.StatusCode, ex.Detail, null, false); - } + CleanupRequest(context, handler, methodStartingTimestamp, isErrorOrInterrupted); } - catch (Exception ex) - { - isErrorOrInterrupted = true; - MagicOnionServerLog.Error(Context.MethodHandler.Logger, ex, context); - Metrics.StreamingHubException(Context.Metrics, handler, ex); + } - if (hasResponse) - { - await context.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{handler.ToString()}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail); - } - } - finally - { - var methodEndingTimestamp = timeProvider.GetTimestamp(); - MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted); - Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted); + return default; + } + + StreamingHubHandler GetOrThrowHandler(UniqueHashDictionary handlers, int methodId) + { + if (!handlers.TryGetValue(methodId, out var handler)) + { + throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId); + } - StreamingHubContextPool.Shared.Return(context); + return handler; + } + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + async ValueTask ProcessRequestAsyncAwait(ValueTask resultTask, StreamingHubContext context, StreamingHubHandler handler, long methodStartingTimestamp, bool hasResponse) + { + var isErrorOrInterrupted = false; + try + { + await resultTask; + } + catch (Exception ex) + { + HandleException(context, handler, ex, hasResponse); + } + finally + { + CleanupRequest(context, handler, methodStartingTimestamp, isErrorOrInterrupted); + } + } + + void HandleException(StreamingHubContext context, StreamingHubHandler handler, Exception ex, bool hasResponse) + { + if (ex is ReturnStatusException rse) + { + if (hasResponse) + { + context.WriteErrorMessage((int)rse.StatusCode, rse.Detail, null, false); } } else { - throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId); + MagicOnionServerLog.Error(Context.MethodHandler.Logger, ex, context); + Metrics.StreamingHubException(Context.Metrics, handler, ex); + + if (hasResponse) + { + context.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{handler.ToString()}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail); + } } } + void CleanupRequest(StreamingHubContext context, StreamingHubHandler handler, long methodStartingTimestamp, bool isErrorOrInterrupted) + { + var methodEndingTimestamp = timeProvider.GetTimestamp(); + var elapsed = timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp); + MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, elapsed.TotalMilliseconds, isErrorOrInterrupted); + Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted); + StreamingHubContextPool.Shared.Return(context); + } // Interface methods for Client diff --git a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs index 11deb5963..9465fb3ad 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs @@ -135,10 +135,9 @@ static async ValueTask Await(StreamingHubContext ctx, ValueTask value) } } - internal ValueTask WriteErrorMessage(int statusCode, string detail, Exception? ex, bool isReturnExceptionStackTraceInErrorDetail) + internal void WriteErrorMessage(int statusCode, string detail, Exception? ex, bool isReturnExceptionStackTraceInErrorDetail) { WriteMessageCore(BuildMessageForError(statusCode, detail, ex, isReturnExceptionStackTraceInErrorDetail)); - return default; } void WriteMessageCore(StreamingHubPayload payload) From 337065571fa83fa2135c0bc54db335c0475df78a Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 2 Sep 2024 15:26:03 +0900 Subject: [PATCH 06/16] Sync --- .../StreamingHubServerMessageReader.cs | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs index 307bf5016..5281cad95 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs @@ -1,4 +1,7 @@ using System; +using System.Data; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using MessagePack; namespace MagicOnion.Internal @@ -14,6 +17,7 @@ public StreamingHubServerMessageReader(ReadOnlyMemory data) this.reader = new MessagePackReader(data); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public StreamingHubMessageType ReadMessageType() { var arrayLength = this.reader.ReadArrayHeader(); @@ -27,12 +31,20 @@ public StreamingHubMessageType ReadMessageType() 0x01 => StreamingHubMessageType.ClientResultResponseWithError, 0x7e => StreamingHubMessageType.ClientHeartbeat, 0x7f => StreamingHubMessageType.ServerHeartbeatResponse, - var subType => throw new InvalidOperationException($"Unknown client response message: {subType}"), + var subType => ThrowUnknownMessageSubType(subType), }, - _ => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"), + _ => ThrowUnknownMessageFormat(arrayLength), }; + + [DoesNotReturn] + static StreamingHubMessageType ThrowUnknownMessageSubType(byte subType) + => throw new InvalidOperationException($"Unknown client response message: {subType}"); + [DoesNotReturn] + static StreamingHubMessageType ThrowUnknownMessageFormat(int arrayLength) + => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (int MethodId, ReadOnlyMemory Body) ReadRequestFireAndForget() { // void: [methodId, [argument]] @@ -42,6 +54,7 @@ public StreamingHubMessageType ReadMessageType() return (methodId, data.Slice(consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (int MessageId, int MethodId, ReadOnlyMemory Body) ReadRequest() { // T: [messageId, methodId, [argument]] @@ -52,6 +65,7 @@ public StreamingHubMessageType ReadMessageType() return (messageId, methodId, data.Slice(consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (Guid ClientResultMessageId, int ClientMethodId, ReadOnlyMemory Body) ReadClientResultResponse() { // T: [0, clientResultMessageId, methodId, result] @@ -62,6 +76,7 @@ public StreamingHubMessageType ReadMessageType() return (clientResultMessageId, clientMethodId, data.Slice(consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (Guid ClientResultMessageId, int ClientMethodId, int StatusCode, string Detail, string Message) ReadClientResultResponseForError() { // T: [1, clientResultMessageId, methodId, [statusCode, detail, message]] @@ -77,6 +92,7 @@ public StreamingHubMessageType ReadMessageType() return (clientResultMessageId, clientMethodId, statusCode, detail, message); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public (short Sequence, long ClientSentAt, ReadOnlyMemory Extra) ReadClientHeartbeat() { // [Sequence(int16), ClientSentAt(long), ] @@ -87,6 +103,7 @@ public StreamingHubMessageType ReadMessageType() return (sequence, clientSentAt, data.Slice((int)reader.Consumed)); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public short ReadServerHeartbeatResponse() { // [Sequence(int16), Nil, Nil] From 0b0765bca82be7e9699c05ae5b6a7767b077116a Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Mon, 2 Sep 2024 15:39:05 +0900 Subject: [PATCH 07/16] Fix --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index ac199f36d..177a6ee7b 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -296,6 +296,7 @@ ValueTask ProcessRequestAsync(UniqueHashDictionary handlers } catch (Exception ex) { + isErrorOrInterrupted = true; HandleException(context, handler, ex, hasResponse); } finally @@ -329,6 +330,7 @@ async ValueTask ProcessRequestAsyncAwait(ValueTask resultTask, StreamingHubConte } catch (Exception ex) { + isErrorOrInterrupted = true; HandleException(context, handler, ex, hasResponse); } finally From 2a52dbfc26a7b79585c0911386b1955df3cd95f2 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Tue, 3 Sep 2024 10:31:51 +0900 Subject: [PATCH 08/16] Remove StreamingHubContextPool --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 11 +++--- .../Hubs/StreamingHubContext.cs | 38 ++++++------------- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 177a6ee7b..1c915cdb7 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -189,13 +189,16 @@ async Task HandleMessageAsync() async ValueTask ConsumeRequestQueueAsync() { + // Create and reuse a single StreamingHubContext for each hub connection. + var context = new StreamingHubContext(); + // We need to process client requests sequentially. // NOTE: Do not pass a CancellationToken to avoid allocation. We call Writer.Complete when we want to stop the consumption loop. await foreach (var request in requests.Reader.ReadAllAsync(default)) { try { - await ProcessRequestAsync(request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse); + await ProcessRequestAsync(context, request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse); } finally { @@ -266,12 +269,10 @@ static void ThrowUnknownMessageType(StreamingHubMessageType messageType) => throw new InvalidOperationException($"Unknown MessageType: {messageType}"); } - ValueTask ProcessRequestAsync(UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) + ValueTask ProcessRequestAsync(StreamingHubContext context, UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) { var handler = GetOrThrowHandler(handlers, methodId); - // Create a context for each call to the hub method. - var context = StreamingHubContextPool.Shared.Get(); context.Initialize( handler: handler, streamingServiceContext: (IStreamingServiceContext)Context, @@ -366,7 +367,7 @@ void CleanupRequest(StreamingHubContext context, StreamingHubHandler handler, lo var elapsed = timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp); MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, elapsed.TotalMilliseconds, isErrorOrInterrupted); Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted); - StreamingHubContextPool.Shared.Return(context); + context.Uninitialize(); } // Interface methods for Client diff --git a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs index 9465fb3ad..1716033f7 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs @@ -1,36 +1,11 @@ using MagicOnion.Internal.Buffers; using MessagePack; using System.Collections.Concurrent; +using System.Diagnostics; using MagicOnion.Internal; -using Microsoft.Extensions.ObjectPool; namespace MagicOnion.Server.Hubs; -internal class StreamingHubContextPool -{ - const int MaxRetainedCount = 16; - readonly ObjectPool pool = new DefaultObjectPool(new Policy(), MaxRetainedCount); - - public static StreamingHubContextPool Shared { get; } = new(); - - public StreamingHubContext Get() => pool.Get(); - public void Return(StreamingHubContext ctx) => pool.Return(ctx); - - class Policy : IPooledObjectPolicy - { - public StreamingHubContext Create() - { - return new StreamingHubContext(); - } - - public bool Return(StreamingHubContext obj) - { - obj.Uninitialize(); - return true; - } - } -} - public class StreamingHubContext { IStreamingServiceContext streamingServiceContext = default!; @@ -70,6 +45,11 @@ public ConcurrentDictionary Items internal void Initialize(StreamingHubHandler handler, IStreamingServiceContext streamingServiceContext, object hubInstance, ReadOnlyMemory request, DateTime timestamp, int messageId) { +#if DEBUG + Debug.Assert(this.handler is null); + Debug.Assert(this.streamingServiceContext is null); + Debug.Assert(this.HubInstance is null); +#endif this.handler = handler; this.streamingServiceContext = streamingServiceContext; HubInstance = hubInstance; @@ -80,6 +60,12 @@ internal void Initialize(StreamingHubHandler handler, IStreamingServiceContext Date: Fri, 6 Sep 2024 16:16:07 +0900 Subject: [PATCH 09/16] Use record struct instead of ValueTuple --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 23 +++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 1c915cdb7..ab090b911 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -37,8 +37,9 @@ public abstract class StreamingHubBase : ServiceBase MarkerResponseBytes => [0x93, 0xff, 0x00, 0x0c]; // MsgPack: [-1, 0, nil] - readonly Channel<(StreamingHubPayload Payload, UniqueHashDictionary Handlers, int MethodId, int MessageId, ReadOnlyMemory Body, bool HasResponse)> requests - = Channel.CreateBounded<(StreamingHubPayload, UniqueHashDictionary, int, int, ReadOnlyMemory, bool)>(new BoundedChannelOptions(capacity: 10) + readonly record struct StreamingHubMethodRequest(StreamingHubPayload Payload, UniqueHashDictionary Handlers, int MethodId, int MessageId, ReadOnlyMemory Body, bool HasResponse); + + readonly Channel requests = Channel.CreateBounded(new BoundedChannelOptions(capacity: 10) { AllowSynchronousContinuations = false, FullMode = BoundedChannelFullMode.Wait, @@ -198,7 +199,7 @@ async ValueTask ConsumeRequestQueueAsync() { try { - await ProcessRequestAsync(context, request.Handlers, request.MethodId, request.MessageId, request.Body, request.HasResponse); + await ProcessRequestAsync(context, request); } finally { @@ -217,12 +218,12 @@ ValueTask ProcessMessageAsync(StreamingHubPayload payload, UniqueHashDictionary< case StreamingHubMessageType.Request: { var requestMessage = reader.ReadRequest(); - return requests.Writer.WriteAsync((payload, handlers, requestMessage.MethodId, requestMessage.MessageId, requestMessage.Body, true), cancellationToken); + return requests.Writer.WriteAsync(new (payload, handlers, requestMessage.MethodId, requestMessage.MessageId, requestMessage.Body, true), cancellationToken); } case StreamingHubMessageType.RequestFireAndForget: { var requestMessage = reader.ReadRequestFireAndForget(); - return requests.Writer.WriteAsync((payload, handlers, requestMessage.MethodId, -1, requestMessage.Body, false), cancellationToken); + return requests.Writer.WriteAsync(new (payload, handlers, requestMessage.MethodId, -1, requestMessage.Body, false), cancellationToken); } case StreamingHubMessageType.ClientResultResponse: { @@ -269,16 +270,16 @@ static void ThrowUnknownMessageType(StreamingHubMessageType messageType) => throw new InvalidOperationException($"Unknown MessageType: {messageType}"); } - ValueTask ProcessRequestAsync(StreamingHubContext context, UniqueHashDictionary handlers, int methodId, int messageId, ReadOnlyMemory body, bool hasResponse) + ValueTask ProcessRequestAsync(StreamingHubContext context, in StreamingHubMethodRequest request) { - var handler = GetOrThrowHandler(handlers, methodId); + var handler = GetOrThrowHandler(request.Handlers, request.MethodId); context.Initialize( handler: handler, streamingServiceContext: (IStreamingServiceContext)Context, hubInstance: this, - request: body, - messageId: messageId, + request: request.Body, + messageId: request.MessageId, timestamp: timeProvider.GetUtcNow().UtcDateTime ); @@ -292,13 +293,13 @@ ValueTask ProcessRequestAsync(StreamingHubContext context, UniqueHashDictionary< if (!resultTask.IsCompletedSuccessfully) { hasCompletedSynchronously = false; - return ProcessRequestAsyncAwait(resultTask, context, handler, methodStartingTimestamp, hasResponse); + return ProcessRequestAsyncAwait(resultTask, context, handler, methodStartingTimestamp, request.HasResponse); } } catch (Exception ex) { isErrorOrInterrupted = true; - HandleException(context, handler, ex, hasResponse); + HandleException(context, handler, ex, request.HasResponse); } finally { From 8521a76fefe7dece2e8cf7e97b153abf69f9edc0 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 6 Sep 2024 16:50:37 +0900 Subject: [PATCH 10/16] Refactor StreamingHub --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 58 +++++++++---------- .../Hubs/StreamingHubContext.cs | 1 + 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index ab090b911..5f3bdc3e6 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -1,5 +1,4 @@ using System.Buffers; -using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading.Channels; @@ -24,6 +23,7 @@ public abstract class StreamingHubBase : ServiceBase handlers = default!; protected static readonly Task NilTask = Task.FromResult(Nil.Default); protected static readonly ValueTask CompletedTask = new ValueTask(); @@ -37,7 +37,7 @@ public abstract class StreamingHubBase : ServiceBase MarkerResponseBytes => [0x93, 0xff, 0x00, 0x0c]; // MsgPack: [-1, 0, nil] - readonly record struct StreamingHubMethodRequest(StreamingHubPayload Payload, UniqueHashDictionary Handlers, int MethodId, int MessageId, ReadOnlyMemory Body, bool HasResponse); + readonly record struct StreamingHubMethodRequest(StreamingHubPayload Payload, int MethodId, int MessageId, ReadOnlyMemory Body, bool HasResponse); readonly Channel requests = Channel.CreateBounded(new BoundedChannelOptions(capacity: 10) { @@ -98,6 +98,8 @@ internal async Task(new MagicOnionRemoteReceiverWriter(StreamingServiceContext), remoteSerializer, remoteClientResultPendingTasks); var handlerRepository = serviceProvider.GetRequiredService(); + this.handlers = handlerRepository.GetHandlers(Context.MethodHandler); + var groupProvider = handlerRepository.GetGroupProvider(Context.MethodHandler); this.Group = new HubGroupRepository(Client, StreamingServiceContext, groupProvider); @@ -170,8 +172,6 @@ async Task HandleMessageAsync() // eg: Send the current game state to the client. await OnConnected(); - var handlers = Context.ServiceProvider.GetRequiredService().GetHandlers(Context.MethodHandler); - // Starts a loop that consumes the request queue. var consumeRequestsTask = ConsumeRequestQueueAsync(); @@ -181,7 +181,7 @@ async Task HandleMessageAsync() { var payload = reader.Current; - await ProcessMessageAsync(payload, handlers, ct); + await ProcessMessageAsync(payload, ct); // NOTE: DO NOT return the StreamingHubPayload to the pool here. // Client requests may be pending at this point. @@ -208,7 +208,7 @@ async ValueTask ConsumeRequestQueueAsync() } } - ValueTask ProcessMessageAsync(StreamingHubPayload payload, UniqueHashDictionary handlers, CancellationToken cancellationToken) + ValueTask ProcessMessageAsync(StreamingHubPayload payload, CancellationToken cancellationToken) { var reader = new StreamingHubServerMessageReader(payload.Memory); var messageType = reader.ReadMessageType(); @@ -218,12 +218,12 @@ ValueTask ProcessMessageAsync(StreamingHubPayload payload, UniqueHashDictionary< case StreamingHubMessageType.Request: { var requestMessage = reader.ReadRequest(); - return requests.Writer.WriteAsync(new (payload, handlers, requestMessage.MethodId, requestMessage.MessageId, requestMessage.Body, true), cancellationToken); + return requests.Writer.WriteAsync(new (payload, requestMessage.MethodId, requestMessage.MessageId, requestMessage.Body, true), cancellationToken); } case StreamingHubMessageType.RequestFireAndForget: { var requestMessage = reader.ReadRequestFireAndForget(); - return requests.Writer.WriteAsync(new (payload, handlers, requestMessage.MethodId, -1, requestMessage.Body, false), cancellationToken); + return requests.Writer.WriteAsync(new (payload, requestMessage.MethodId, -1, requestMessage.Body, false), cancellationToken); } case StreamingHubMessageType.ClientResultResponse: { @@ -270,11 +270,11 @@ static void ThrowUnknownMessageType(StreamingHubMessageType messageType) => throw new InvalidOperationException($"Unknown MessageType: {messageType}"); } - ValueTask ProcessRequestAsync(StreamingHubContext context, in StreamingHubMethodRequest request) + ValueTask ProcessRequestAsync(StreamingHubContext hubContext, in StreamingHubMethodRequest request) { - var handler = GetOrThrowHandler(request.Handlers, request.MethodId); + var handler = GetOrThrowHandler(request.MethodId); - context.Initialize( + hubContext.Initialize( handler: handler, streamingServiceContext: (IStreamingServiceContext)Context, hubInstance: this, @@ -286,33 +286,33 @@ ValueTask ProcessRequestAsync(StreamingHubContext context, in StreamingHubMethod var methodStartingTimestamp = timeProvider.GetTimestamp(); var isErrorOrInterrupted = false; var hasCompletedSynchronously = true; - MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, context, context.Request, handler.RequestType); + MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, hubContext, hubContext.Request, handler.RequestType); try { - var resultTask = handler.MethodBody.Invoke(context); + var resultTask = handler.MethodBody.Invoke(hubContext); if (!resultTask.IsCompletedSuccessfully) { hasCompletedSynchronously = false; - return ProcessRequestAsyncAwait(resultTask, context, handler, methodStartingTimestamp, request.HasResponse); + return ProcessRequestAsyncAwait(resultTask, hubContext, methodStartingTimestamp, request.HasResponse); } } catch (Exception ex) { isErrorOrInterrupted = true; - HandleException(context, handler, ex, request.HasResponse); + HandleException(hubContext, ex, request.HasResponse); } finally { if (hasCompletedSynchronously) { - CleanupRequest(context, handler, methodStartingTimestamp, isErrorOrInterrupted); + CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); } } return default; } - StreamingHubHandler GetOrThrowHandler(UniqueHashDictionary handlers, int methodId) + StreamingHubHandler GetOrThrowHandler(int methodId) { if (!handlers.TryGetValue(methodId, out var handler)) { @@ -323,7 +323,7 @@ StreamingHubHandler GetOrThrowHandler(UniqueHashDictionary } [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask ProcessRequestAsyncAwait(ValueTask resultTask, StreamingHubContext context, StreamingHubHandler handler, long methodStartingTimestamp, bool hasResponse) + async ValueTask ProcessRequestAsyncAwait(ValueTask resultTask, StreamingHubContext hubContext, long methodStartingTimestamp, bool hasResponse) { var isErrorOrInterrupted = false; try @@ -333,42 +333,42 @@ async ValueTask ProcessRequestAsyncAwait(ValueTask resultTask, StreamingHubConte catch (Exception ex) { isErrorOrInterrupted = true; - HandleException(context, handler, ex, hasResponse); + HandleException(hubContext, ex, hasResponse); } finally { - CleanupRequest(context, handler, methodStartingTimestamp, isErrorOrInterrupted); + CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); } } - void HandleException(StreamingHubContext context, StreamingHubHandler handler, Exception ex, bool hasResponse) + void HandleException(StreamingHubContext hubContext, Exception ex, bool hasResponse) { if (ex is ReturnStatusException rse) { if (hasResponse) { - context.WriteErrorMessage((int)rse.StatusCode, rse.Detail, null, false); + hubContext.WriteErrorMessage((int)rse.StatusCode, rse.Detail, null, false); } } else { - MagicOnionServerLog.Error(Context.MethodHandler.Logger, ex, context); - Metrics.StreamingHubException(Context.Metrics, handler, ex); + MagicOnionServerLog.Error(Context.MethodHandler.Logger, ex, hubContext); + Metrics.StreamingHubException(Context.Metrics, hubContext.Handler, ex); if (hasResponse) { - context.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{handler.ToString()}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail); + hubContext.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{hubContext.Handler}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail); } } } - void CleanupRequest(StreamingHubContext context, StreamingHubHandler handler, long methodStartingTimestamp, bool isErrorOrInterrupted) + void CleanupRequest(StreamingHubContext hubContext, long methodStartingTimestamp, bool isErrorOrInterrupted) { var methodEndingTimestamp = timeProvider.GetTimestamp(); var elapsed = timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp); - MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, elapsed.TotalMilliseconds, isErrorOrInterrupted); - Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted); - context.Uninitialize(); + MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, hubContext, hubContext.ResponseSize, hubContext.ResponseType, elapsed.TotalMilliseconds, isErrorOrInterrupted); + Metrics.StreamingHubMethodCompleted(Context.Metrics, hubContext.Handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted); + hubContext.Uninitialize(); } // Interface methods for Client diff --git a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs index 1716033f7..71552bc0c 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs @@ -37,6 +37,7 @@ public ConcurrentDictionary Items public IServiceContext ServiceContext => streamingServiceContext; + internal StreamingHubHandler Handler => handler; internal int MessageId { get; private set; } internal int MethodId => handler.MethodId; From 20a4caad220b136b385f687ad9a7515885b84ecb Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 6 Sep 2024 17:05:51 +0900 Subject: [PATCH 11/16] Refactor StreamingHub --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 110 +++++++-------------- 1 file changed, 38 insertions(+), 72 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 5f3bdc3e6..668b306fe 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -1,6 +1,5 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; using System.Threading.Channels; using Cysharp.Runtime.Multicast.Remoting; using Grpc.Core; @@ -188,25 +187,6 @@ async Task HandleMessageAsync() } } - async ValueTask ConsumeRequestQueueAsync() - { - // Create and reuse a single StreamingHubContext for each hub connection. - var context = new StreamingHubContext(); - - // We need to process client requests sequentially. - // NOTE: Do not pass a CancellationToken to avoid allocation. We call Writer.Complete when we want to stop the consumption loop. - await foreach (var request in requests.Reader.ReadAllAsync(default)) - { - try - { - await ProcessRequestAsync(context, request); - } - finally - { - StreamingHubPayloadPool.Shared.Return(request.Payload); - } - } - } ValueTask ProcessMessageAsync(StreamingHubPayload payload, CancellationToken cancellationToken) { @@ -270,46 +250,51 @@ static void ThrowUnknownMessageType(StreamingHubMessageType messageType) => throw new InvalidOperationException($"Unknown MessageType: {messageType}"); } - ValueTask ProcessRequestAsync(StreamingHubContext hubContext, in StreamingHubMethodRequest request) + async ValueTask ConsumeRequestQueueAsync() { - var handler = GetOrThrowHandler(request.MethodId); - - hubContext.Initialize( - handler: handler, - streamingServiceContext: (IStreamingServiceContext)Context, - hubInstance: this, - request: request.Body, - messageId: request.MessageId, - timestamp: timeProvider.GetUtcNow().UtcDateTime - ); - - var methodStartingTimestamp = timeProvider.GetTimestamp(); - var isErrorOrInterrupted = false; - var hasCompletedSynchronously = true; - MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, hubContext, hubContext.Request, handler.RequestType); - try + // Create and reuse a single StreamingHubContext for each hub connection. + var hubContext = new StreamingHubContext(); + + // We need to process client requests sequentially. + // NOTE: Do not pass a CancellationToken to avoid allocation. We call Writer.Complete when we want to stop the consumption loop. + await foreach (var request in requests.Reader.ReadAllAsync(default)) { - var resultTask = handler.MethodBody.Invoke(hubContext); - if (!resultTask.IsCompletedSuccessfully) + try { - hasCompletedSynchronously = false; - return ProcessRequestAsyncAwait(resultTask, hubContext, methodStartingTimestamp, request.HasResponse); + var handler = GetOrThrowHandler(request.MethodId); + + hubContext.Initialize( + handler: handler, + streamingServiceContext: (IStreamingServiceContext)Context, + hubInstance: this, + request: request.Body, + messageId: request.MessageId, + timestamp: timeProvider.GetUtcNow().UtcDateTime + ); + + var isErrorOrInterrupted = false; + var methodStartingTimestamp = timeProvider.GetTimestamp(); + MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, hubContext, hubContext.Request, handler.RequestType); + + try + { + await handler.MethodBody.Invoke(hubContext); + } + catch (Exception ex) + { + isErrorOrInterrupted = true; + HandleException(hubContext, ex, request.HasResponse); + } + finally + { + CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); + } } - } - catch (Exception ex) - { - isErrorOrInterrupted = true; - HandleException(hubContext, ex, request.HasResponse); - } - finally - { - if (hasCompletedSynchronously) + finally { - CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); + StreamingHubPayloadPool.Shared.Return(request.Payload); } } - - return default; } StreamingHubHandler GetOrThrowHandler(int methodId) @@ -322,25 +307,6 @@ StreamingHubHandler GetOrThrowHandler(int methodId) return handler; } - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] - async ValueTask ProcessRequestAsyncAwait(ValueTask resultTask, StreamingHubContext hubContext, long methodStartingTimestamp, bool hasResponse) - { - var isErrorOrInterrupted = false; - try - { - await resultTask; - } - catch (Exception ex) - { - isErrorOrInterrupted = true; - HandleException(hubContext, ex, hasResponse); - } - finally - { - CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); - } - } - void HandleException(StreamingHubContext hubContext, Exception ex, bool hasResponse) { if (ex is ReturnStatusException rse) From 66cf8801715c414fcc065776f605602f2659338b Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 6 Sep 2024 18:00:57 +0900 Subject: [PATCH 12/16] Use StackTraceHidden --- .../Filters/Internal/FilterHelper.cs | 17 +++-- .../Filters/Internal/InvokeHelper.cs | 68 ------------------- .../Filter/FilterHelperTest.cs | 66 ++++++++++++++++++ 3 files changed, 76 insertions(+), 75 deletions(-) delete mode 100644 src/MagicOnion.Server/Filters/Internal/InvokeHelper.cs diff --git a/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs b/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs index 7d96714d8..185bb5e1f 100644 --- a/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs +++ b/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Reflection; using MagicOnion.Server.Hubs; @@ -63,28 +64,30 @@ IMagicOnionFilterFactory filterFactory public static Func WrapMethodBodyWithFilter(IServiceProvider serviceProvider, IEnumerable filters, Func methodBody) { - Func next = methodBody; + Func prev = methodBody; foreach (var filterDescriptor in filters.Reverse()) { var newFilter = CreateOrGetInstance(serviceProvider, filterDescriptor); - next = new InvokeHelper>(newFilter.Invoke, next).GetDelegate(); + var next = prev; + prev = [StackTraceHidden] (ctx) => newFilter.Invoke(ctx, next); } - return next; + return prev; } public static Func WrapMethodBodyWithFilter(IServiceProvider serviceProvider, IEnumerable filters, Func methodBody) { - Func next = methodBody; + Func prev = methodBody; foreach (var filterDescriptor in filters.Reverse()) { var newFilter = CreateOrGetInstance(serviceProvider, filterDescriptor); - next = new InvokeHelper>(newFilter.Invoke, next).GetDelegate(); + var next = prev; + prev = [StackTraceHidden] (ctx) => newFilter.Invoke(ctx, next); } - return next; + return prev; } public static TFilter CreateOrGetInstance(IServiceProvider serviceProvider, MagicOnionFilterDescriptor descriptor) @@ -100,4 +103,4 @@ public static TFilter CreateOrGetInstance(IServiceProvider serviceProvi throw new InvalidOperationException($"MagicOnionFilterDescriptor requires instance or factory. but the descriptor has '{descriptor.Filter.GetType()}'"); } } -} \ No newline at end of file +} diff --git a/src/MagicOnion.Server/Filters/Internal/InvokeHelper.cs b/src/MagicOnion.Server/Filters/Internal/InvokeHelper.cs deleted file mode 100644 index 6cdab210b..000000000 --- a/src/MagicOnion.Server/Filters/Internal/InvokeHelper.cs +++ /dev/null @@ -1,68 +0,0 @@ -using System.Diagnostics; -using System.Reflection.Emit; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -namespace MagicOnion.Server.Filters.Internal; - -internal class InvokeHelper - where TDelegate : Delegate -{ - public Func Invoke; - public TDelegate Next; - - static readonly Func, TDelegate> InvokeNextFactory; - - static InvokeHelper() - { - var fieldInvoke = typeof(InvokeHelper).GetField("Invoke")!; - var fieldNext = typeof(InvokeHelper).GetField("Next")!; - var methodInvoke = typeof(Func).GetMethod("Invoke")!; - - if (RuntimeInformation.FrameworkDescription.StartsWith(".NET Core 4")) /* .NET Core 2.x returns ".NET Core 4.x.y.z" */ - { - // HACK: If the app is running on .NET Core 2.2 or earlier, the runtime hides dynamic method in the stack trace. - var method = new DynamicMethod("InvokeNext", typeof(ValueTask), new[] { typeof(InvokeHelper), typeof(TArg1) }, restrictedSkipVisibility: true); - { - var il = method.GetILGenerator(); - - // invoke = arg0.Invoke; - il.Emit(OpCodes.Ldarg_0); - il.Emit(OpCodes.Ldfld, fieldInvoke); - - // next = arg0.Next; - // return invoke(arg1, next); - il.Emit(OpCodes.Ldarg_1); - il.Emit(OpCodes.Ldarg_0); - il.Emit(OpCodes.Ldfld, fieldNext); - il.Emit(OpCodes.Callvirt, methodInvoke); - il.Emit(OpCodes.Ret); - } - - InvokeNextFactory = (helper) => (TDelegate)method.CreateDelegate(typeof(TDelegate), helper); - - } - else - { - // HACK: If the app is running on .NET Core 3.0 or later, the runtime hides `AggressiveInlining` method in the stack trace. (If the app is running on .NET Framework 4.x, This hack does not affect.) - // https://github.com/dotnet/coreclr/blob/release/3.0/src/System.Private.CoreLib/shared/System/Diagnostics/StackTrace.cs#L343-L350 - InvokeNextFactory = (helper) => - { - var invokeNext = new Func(helper.InvokeNext); - return (TDelegate)Delegate.CreateDelegate(typeof(TDelegate), invokeNext.Target, invokeNext.Method); - }; - } - } - - public InvokeHelper(Func invoke, TDelegate next) - { - Invoke = invoke; - Next = next; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - [DebuggerHidden] - ValueTask InvokeNext(TArg1 arg1) => Invoke(arg1, Next); - - public TDelegate GetDelegate() => InvokeNextFactory(this); -} diff --git a/tests/MagicOnion.Server.Tests/Filter/FilterHelperTest.cs b/tests/MagicOnion.Server.Tests/Filter/FilterHelperTest.cs index 91ffc6372..aab56acf8 100644 --- a/tests/MagicOnion.Server.Tests/Filter/FilterHelperTest.cs +++ b/tests/MagicOnion.Server.Tests/Filter/FilterHelperTest.cs @@ -745,6 +745,39 @@ public async Task WrapMethodBodyWithFilter_Surround_Service() results.Should().Equal(1, 2, 0, 200, 100); } + [Fact] + public async Task WrapMethodBodyWithFilter_Surround_Service_ThrowStackTrace() + { + // Arrange + var services = new ServiceCollection(); + var serviceProvider = services.BuildServiceProvider(); + var callStack = default(string); + var filters = new[] + { + new MagicOnionServiceFilterDescriptor(new DelegateServiceFilter(async (context, next) => + { + await next(context); + })), + new MagicOnionServiceFilterDescriptor(new DelegateServiceFilter(async (context, next) => + { + await next(context); + })), + }; + + // Act + var body = FilterHelper.WrapMethodBodyWithFilter(serviceProvider, filters, (context) => + { + callStack = Environment.StackTrace; + throw new InvalidOperationException(); + }); + var ex = await Record.ExceptionAsync(async () => await body(default)); + + // Assert + ex.Should().NotBeNull(); + ex.StackTrace.Should().NotContain("MagicOnion.Server.Filters.Internal.FilterHelper.<>c__DisplayClass"); + callStack.Should().NotContain("MagicOnion.Server.Filters.Internal.FilterHelper.<>c__DisplayClass"); + } + class DelegateServiceFilter : IMagicOnionServiceFilter { readonly Func, ValueTask> func; @@ -793,6 +826,39 @@ public async Task WrapMethodBodyWithFilter_Surround_StreamingHub() results.Should().Equal(1, 2, 0, 200, 100); } + [Fact] + public async Task WrapMethodBodyWithFilter_Surround_StreamingHub_ThrowStackTrace() + { + // Arrange + var services = new ServiceCollection(); + var serviceProvider = services.BuildServiceProvider(); + var callStack = default(string); + var filters = new[] + { + new StreamingHubFilterDescriptor(new DelegateHubFilter(async (context, next) => + { + await next(context); + })), + new StreamingHubFilterDescriptor(new DelegateHubFilter(async (context, next) => + { + await next(context); + })), + }; + + // Act + var body = FilterHelper.WrapMethodBodyWithFilter(serviceProvider, filters, (context) => + { + callStack = Environment.StackTrace; + throw new InvalidOperationException(); + }); + var ex = await Record.ExceptionAsync(async () => await body(default)); + + // Assert + ex.Should().NotBeNull(); + ex.StackTrace.Should().NotContain("MagicOnion.Server.Filters.Internal.FilterHelper.<>c__DisplayClass"); + callStack.Should().NotContain("MagicOnion.Server.Filters.Internal.FilterHelper.<>c__DisplayClass"); + } + class DelegateHubFilter : IStreamingHubFilter { readonly Func, ValueTask> func; From 4d2a00e5e5c56e0b68a9f2847163013e23d96592 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Sep 2024 10:41:27 +0900 Subject: [PATCH 13/16] prev/next -> outer/inner --- .../Filters/Internal/FilterHelper.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs b/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs index 185bb5e1f..08021f7fb 100644 --- a/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs +++ b/src/MagicOnion.Server/Filters/Internal/FilterHelper.cs @@ -64,30 +64,30 @@ IMagicOnionFilterFactory filterFactory public static Func WrapMethodBodyWithFilter(IServiceProvider serviceProvider, IEnumerable filters, Func methodBody) { - Func prev = methodBody; + Func outer = methodBody; foreach (var filterDescriptor in filters.Reverse()) { var newFilter = CreateOrGetInstance(serviceProvider, filterDescriptor); - var next = prev; - prev = [StackTraceHidden] (ctx) => newFilter.Invoke(ctx, next); + var inner = outer; + outer = [StackTraceHidden] (ctx) => newFilter.Invoke(ctx, inner); } - return prev; + return outer; } public static Func WrapMethodBodyWithFilter(IServiceProvider serviceProvider, IEnumerable filters, Func methodBody) { - Func prev = methodBody; + Func outer = methodBody; foreach (var filterDescriptor in filters.Reverse()) { var newFilter = CreateOrGetInstance(serviceProvider, filterDescriptor); - var next = prev; - prev = [StackTraceHidden] (ctx) => newFilter.Invoke(ctx, next); + var inner = outer; + outer = [StackTraceHidden] (ctx) => newFilter.Invoke(ctx, inner); } - return prev; + return outer; } public static TFilter CreateOrGetInstance(IServiceProvider serviceProvider, MagicOnionFilterDescriptor descriptor) From 691901f79ee21e98b098adac58a8a15836e723ec Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 13 Sep 2024 18:31:01 +0900 Subject: [PATCH 14/16] Use ArrayPool --- .../Internal.Shared/StreamingHubPayload.cs | 72 ++++--------------- .../StreamingHubPayloadPool.BuiltIn.cs | 11 --- .../StreamingHubPayloadPool.ObjectPool.cs | 11 --- .../StreamingHubPayload.cs | 72 ++++--------------- .../StreamingHubPayloadPool.BuiltIn.cs | 11 --- .../StreamingHubPayloadPool.ObjectPool.cs | 11 --- 6 files changed, 24 insertions(+), 164 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs index f83314d44..96e9cfc9c 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs @@ -74,34 +74,24 @@ internal class StreamingHubPayload : StreamingHubPayloadCore internal class StreamingHubPayloadCore { - const int PreAllocatedBufferSize = 4096; - - readonly byte[] preAllocatedBuffer = new byte[PreAllocatedBufferSize]; byte[]? buffer; - ReadOnlyMemory? memory; + int length = -1; #if DEBUG public short Version { get; private set; } #endif - public int Length => memory!.Value.Length; - public ReadOnlySpan Span => memory!.Value.Span; - public ReadOnlyMemory Memory => memory!.Value; + public int Length => length; + public ReadOnlySpan Span => buffer!.AsSpan(0, length); + public ReadOnlyMemory Memory => buffer!.AsMemory(0, length); public void Initialize(ReadOnlySpan data) { ThrowIfUsing(); - if (data.Length > preAllocatedBuffer.Length) - { - buffer = ArrayPool.Shared.Rent(data.Length); - } - else - { - buffer = preAllocatedBuffer; - } + buffer = ArrayPool.Shared.Rent(data.Length); + length = data.Length; data.CopyTo(buffer); - memory = buffer.AsMemory(0, (int)data.Length); } public void Initialize(ReadOnlySequence data) @@ -109,41 +99,9 @@ public void Initialize(ReadOnlySequence data) ThrowIfUsing(); if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue"); - if (data.Length > preAllocatedBuffer.Length) - { - buffer = ArrayPool.Shared.Rent((int)data.Length); - } - else - { - buffer = preAllocatedBuffer; - } - + buffer = ArrayPool.Shared.Rent((int)data.Length); + length = (int)data.Length; data.CopyTo(buffer); - memory = buffer.AsMemory(0, (int)data.Length); - } - - public void Initialize(ReadOnlyMemory data, bool holdReference) - { - ThrowIfUsing(); - - if (holdReference) - { - buffer = null; - memory = data; - } - else - { - if (data.Length > preAllocatedBuffer.Length) - { - buffer = ArrayPool.Shared.Rent((int)data.Length); - } - else - { - buffer = preAllocatedBuffer; - } - data.CopyTo(buffer); - memory = buffer.AsMemory(0, (int)data.Length); - } } public void Uninitialize() @@ -155,13 +113,10 @@ public void Uninitialize() #if DEBUG && NET6_0_OR_GREATER Array.Fill(buffer, 0xff); #endif - if (buffer != preAllocatedBuffer) - { - ArrayPool.Shared.Return(buffer); - } + ArrayPool.Shared.Return(buffer); } - memory = null; + length = -1; buffer = null; #if DEBUG @@ -169,13 +124,10 @@ public void Uninitialize() #endif } -#if !UNITY_2021_1_OR_NEWER && !NETSTANDARD2_0 && !NETSTANDARD2_1 - [MemberNotNull(nameof(memory))] -#endif void ThrowIfUninitialized() { //Debug.Assert(memory is not null); - if (memory is null) + if (length == -1) { throw new InvalidOperationException("A StreamingHubPayload has been already uninitialized."); } @@ -184,7 +136,7 @@ void ThrowIfUninitialized() void ThrowIfUsing() { //Debug.Assert(memory is null); - if (memory is not null) + if (length != -1) { throw new InvalidOperationException("A StreamingHubPayload is currently used by other caller."); } diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs index 2245d3e88..3843ce2a3 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs @@ -102,17 +102,6 @@ public StreamingHubPayload RentOrCreate(ReadOnlySpan data) return new StreamingHubPayload(payload); #else return (StreamingHubPayload)payload; -#endif - } - - public StreamingHubPayload RentOrCreate(ReadOnlyMemory data, bool holdReference) - { - var payload = pool.RentOrCreateCore(); - payload.Initialize(data, holdReference); -#if DEBUG - return new StreamingHubPayload(payload); -#else - return (StreamingHubPayload)payload; #endif } } diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs index 9bb807d06..6bad7faaa 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs @@ -34,17 +34,6 @@ public StreamingHubPayload RentOrCreate(ReadOnlySpan data) #endif } - public StreamingHubPayload RentOrCreate(ReadOnlyMemory data, bool holdReference) - { - var payload = pool.Get(); - payload.Initialize(data, holdReference); -#if DEBUG - return new StreamingHubPayload(payload); -#else - return (StreamingHubPayload)payload; -#endif - } - public void Return(StreamingHubPayload payload) { #if DEBUG diff --git a/src/MagicOnion.Internal/StreamingHubPayload.cs b/src/MagicOnion.Internal/StreamingHubPayload.cs index f83314d44..96e9cfc9c 100644 --- a/src/MagicOnion.Internal/StreamingHubPayload.cs +++ b/src/MagicOnion.Internal/StreamingHubPayload.cs @@ -74,34 +74,24 @@ internal class StreamingHubPayload : StreamingHubPayloadCore internal class StreamingHubPayloadCore { - const int PreAllocatedBufferSize = 4096; - - readonly byte[] preAllocatedBuffer = new byte[PreAllocatedBufferSize]; byte[]? buffer; - ReadOnlyMemory? memory; + int length = -1; #if DEBUG public short Version { get; private set; } #endif - public int Length => memory!.Value.Length; - public ReadOnlySpan Span => memory!.Value.Span; - public ReadOnlyMemory Memory => memory!.Value; + public int Length => length; + public ReadOnlySpan Span => buffer!.AsSpan(0, length); + public ReadOnlyMemory Memory => buffer!.AsMemory(0, length); public void Initialize(ReadOnlySpan data) { ThrowIfUsing(); - if (data.Length > preAllocatedBuffer.Length) - { - buffer = ArrayPool.Shared.Rent(data.Length); - } - else - { - buffer = preAllocatedBuffer; - } + buffer = ArrayPool.Shared.Rent(data.Length); + length = data.Length; data.CopyTo(buffer); - memory = buffer.AsMemory(0, (int)data.Length); } public void Initialize(ReadOnlySequence data) @@ -109,41 +99,9 @@ public void Initialize(ReadOnlySequence data) ThrowIfUsing(); if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue"); - if (data.Length > preAllocatedBuffer.Length) - { - buffer = ArrayPool.Shared.Rent((int)data.Length); - } - else - { - buffer = preAllocatedBuffer; - } - + buffer = ArrayPool.Shared.Rent((int)data.Length); + length = (int)data.Length; data.CopyTo(buffer); - memory = buffer.AsMemory(0, (int)data.Length); - } - - public void Initialize(ReadOnlyMemory data, bool holdReference) - { - ThrowIfUsing(); - - if (holdReference) - { - buffer = null; - memory = data; - } - else - { - if (data.Length > preAllocatedBuffer.Length) - { - buffer = ArrayPool.Shared.Rent((int)data.Length); - } - else - { - buffer = preAllocatedBuffer; - } - data.CopyTo(buffer); - memory = buffer.AsMemory(0, (int)data.Length); - } } public void Uninitialize() @@ -155,13 +113,10 @@ public void Uninitialize() #if DEBUG && NET6_0_OR_GREATER Array.Fill(buffer, 0xff); #endif - if (buffer != preAllocatedBuffer) - { - ArrayPool.Shared.Return(buffer); - } + ArrayPool.Shared.Return(buffer); } - memory = null; + length = -1; buffer = null; #if DEBUG @@ -169,13 +124,10 @@ public void Uninitialize() #endif } -#if !UNITY_2021_1_OR_NEWER && !NETSTANDARD2_0 && !NETSTANDARD2_1 - [MemberNotNull(nameof(memory))] -#endif void ThrowIfUninitialized() { //Debug.Assert(memory is not null); - if (memory is null) + if (length == -1) { throw new InvalidOperationException("A StreamingHubPayload has been already uninitialized."); } @@ -184,7 +136,7 @@ void ThrowIfUninitialized() void ThrowIfUsing() { //Debug.Assert(memory is null); - if (memory is not null) + if (length != -1) { throw new InvalidOperationException("A StreamingHubPayload is currently used by other caller."); } diff --git a/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs b/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs index 2245d3e88..3843ce2a3 100644 --- a/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs +++ b/src/MagicOnion.Internal/StreamingHubPayloadPool.BuiltIn.cs @@ -102,17 +102,6 @@ public StreamingHubPayload RentOrCreate(ReadOnlySpan data) return new StreamingHubPayload(payload); #else return (StreamingHubPayload)payload; -#endif - } - - public StreamingHubPayload RentOrCreate(ReadOnlyMemory data, bool holdReference) - { - var payload = pool.RentOrCreateCore(); - payload.Initialize(data, holdReference); -#if DEBUG - return new StreamingHubPayload(payload); -#else - return (StreamingHubPayload)payload; #endif } } diff --git a/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs index 9bb807d06..6bad7faaa 100644 --- a/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs +++ b/src/MagicOnion.Internal/StreamingHubPayloadPool.ObjectPool.cs @@ -34,17 +34,6 @@ public StreamingHubPayload RentOrCreate(ReadOnlySpan data) #endif } - public StreamingHubPayload RentOrCreate(ReadOnlyMemory data, bool holdReference) - { - var payload = pool.Get(); - payload.Initialize(data, holdReference); -#if DEBUG - return new StreamingHubPayload(payload); -#else - return (StreamingHubPayload)payload; -#endif - } - public void Return(StreamingHubPayload payload) { #if DEBUG From a9c4a3c3affbc5739af548bc36ec8a9fde8381ed Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Thu, 24 Oct 2024 16:49:49 +0900 Subject: [PATCH 15/16] Fix build errors --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 3ba75db51..bb8c2ad39 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -25,6 +25,7 @@ public abstract class StreamingHubBase : ServiceBase handlers = default!; protected static readonly Task NilTask = Task.FromResult(Nil.Default); protected static readonly ValueTask CompletedTask = new ValueTask(); @@ -90,11 +91,13 @@ async Task> IStr var serviceProvider = streamingContext.ServiceContext.ServiceProvider; var features = this.Context.CallContext.GetHttpContext().Features; - streamingHubFeature = features.Get()!; // TODO: GetRequiredFeature + streamingHubFeature = features.GetRequiredFeature(); var magicOnionOptions = serviceProvider.GetRequiredService>().Value; timeProvider = magicOnionOptions.TimeProvider ?? TimeProvider.System; isReturnExceptionStackTraceInErrorDetail = magicOnionOptions.IsReturnExceptionStackTraceInErrorDetail; + handlers = streamingHubFeature.Handlers; + var remoteProxyFactory = serviceProvider.GetRequiredService(); var remoteSerializer = serviceProvider.GetRequiredService(); this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, timeProvider); @@ -170,8 +173,6 @@ async Task HandleMessageAsync() // eg: Send the current game state to the client. await OnConnected(); - var handlers = streamingHubFeature.Handlers; - // Starts a loop that consumes the request queue. var consumeRequestsTask = ConsumeRequestQueueAsync(); @@ -268,14 +269,14 @@ async ValueTask ConsumeRequestQueueAsync() handler: handler, streamingServiceContext: (IStreamingServiceContext)Context, hubInstance: this, - request: body, - messageId: messageId, + request: request.Body, + messageId: request.MessageId, timestamp: timeProvider.GetUtcNow().UtcDateTime ); var isErrorOrInterrupted = false; var methodStartingTimestamp = timeProvider.GetTimestamp(); - MagicOnionServerLog.BeginInvokeHubMethod(Context.Logger, context, context.Request, handler.RequestType); + MagicOnionServerLog.BeginInvokeHubMethod(Context.Logger, hubContext, hubContext.Request, handler.RequestType); try { @@ -319,12 +320,12 @@ void HandleException(StreamingHubContext hubContext, Exception ex, bool hasRespo } else { - MagicOnionServerLog.Error(Context.MethodHandler.Logger, ex, hubContext); + MagicOnionServerLog.Error(Context.Logger, ex, hubContext); Metrics.StreamingHubException(Context.Metrics, hubContext.Handler, ex); if (hasResponse) { - hubContext.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{hubContext.Handler}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail); + hubContext.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{hubContext.Handler}'.", ex, isReturnExceptionStackTraceInErrorDetail); } } } From 8b2206917dbb3c0d6fab14abb21f4ec07fe5794d Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Thu, 26 Dec 2024 18:32:16 +0900 Subject: [PATCH 16/16] Fix build --- .../StreamingHubServerMessageReader.cs | 13 ++-- src/MagicOnion.Server/Hubs/StreamingHub.cs | 70 ++++++++++--------- .../Hubs/StreamingHubContext.cs | 1 + 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs b/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs index 68f5e8bad..93d9c3330 100644 --- a/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs +++ b/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs @@ -35,14 +35,11 @@ public StreamingHubMessageType ReadMessageType() 2 => StreamingHubMessageType.RequestFireAndForget, 3 => StreamingHubMessageType.Request, 4 => ReadMessageSubType(), - _ => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"), + _ => ThrowUnknownMessageFormat(arrayLength), }; [DoesNotReturn] - static StreamingHubMessageType ThrowUnknownMessageSubType(byte subType) - => throw new InvalidOperationException($"Unknown client response message: {subType}"); - [DoesNotReturn] - static StreamingHubMessageType ThrowUnknownMessageFormat(int arrayLength) + static StreamingHubMessageType ThrowUnknownMessageFormat(uint arrayLength) => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"); } StreamingHubMessageType ReadMessageSubType() @@ -55,8 +52,12 @@ StreamingHubMessageType ReadMessageSubType() 0x01 => StreamingHubMessageType.ClientResultResponseWithError, 0x7e => StreamingHubMessageType.ClientHeartbeat, 0x7f => StreamingHubMessageType.ServerHeartbeatResponse, - _ => throw new InvalidOperationException($"Unknown client response message: {subType}"), + _ => ThrowUnknownMessageSubType(subType), }; + + [DoesNotReturn] + static StreamingHubMessageType ThrowUnknownMessageSubType(byte subType) + => throw new InvalidOperationException($"Unknown client response message: {subType}"); } public (int MethodId, ReadOnlyMemory Body) ReadRequestFireAndForget() diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 123c674ed..81ee9f8df 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -264,33 +264,38 @@ async ValueTask ConsumeRequestQueueAsync() { try { - var handler = GetOrThrowHandler(request.MethodId); - - hubContext.Initialize( - handler: handler, - streamingServiceContext: (IStreamingServiceContext)Context, - hubInstance: this, - request: request.Body, - messageId: request.MessageId, - timestamp: timeProvider.GetUtcNow().UtcDateTime - ); - - var isErrorOrInterrupted = false; - var methodStartingTimestamp = timeProvider.GetTimestamp(); - MagicOnionServerLog.BeginInvokeHubMethod(Context.Logger, hubContext, hubContext.Request, handler.RequestType); - - try + if (handlers.TryGetValue(request.MethodId, out var handler)) { - await handler.MethodBody.Invoke(hubContext); - } - catch (Exception ex) - { - isErrorOrInterrupted = true; - HandleException(hubContext, ex, request.HasResponse); + hubContext.Initialize( + handler: handler, + streamingServiceContext: (IStreamingServiceContext)Context, + hubInstance: this, + request: request.Body, + messageId: request.MessageId, + timestamp: timeProvider.GetUtcNow().UtcDateTime + ); + + var isErrorOrInterrupted = false; + var methodStartingTimestamp = timeProvider.GetTimestamp(); + MagicOnionServerLog.BeginInvokeHubMethod(Context.Logger, hubContext, hubContext.Request, handler.RequestType); + + try + { + await handler.MethodBody.Invoke(hubContext); + } + catch (Exception ex) + { + isErrorOrInterrupted = true; + HandleException(hubContext, ex, request.HasResponse); + } + finally + { + CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); + } } - finally + else { - CleanupRequest(hubContext, methodStartingTimestamp, isErrorOrInterrupted); + RespondMethodNotFound(request.MethodId, request.MessageId); } } finally @@ -300,16 +305,6 @@ async ValueTask ConsumeRequestQueueAsync() } } - StreamingHubHandler GetOrThrowHandler(int methodId) - { - if (!handlers.TryGetValue(methodId, out var handler)) - { - throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId); - } - - return handler; - } - void HandleException(StreamingHubContext hubContext, Exception ex, bool hasResponse) { if (ex is ReturnStatusException rse) @@ -340,6 +335,13 @@ void CleanupRequest(StreamingHubContext hubContext, long methodStartingTimestamp hubContext.Uninitialize(); } + void RespondMethodNotFound(int methodId, int messageId) + { + MagicOnionServerLog.HubMethodNotFound(Context.Logger, Context.ServiceName, methodId); + var payload = StreamingHubPayloadBuilder.BuildError(messageId, (int)StatusCode.Unimplemented, $"StreamingHub method '{methodId}' is not found in StreamingHub.", null, isReturnExceptionStackTraceInErrorDetail); + StreamingServiceContext.QueueResponseStreamWrite(payload); + } + // Interface methods for Client THubInterface IStreamingHub.FireAndForget() diff --git a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs index 9c17d709f..70c1b285a 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubContext.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubContext.cs @@ -2,6 +2,7 @@ using System.Collections.Concurrent; using System.Diagnostics; using MagicOnion.Internal; +using MagicOnion.Server.Hubs.Internal; using Microsoft.Extensions.ObjectPool; namespace MagicOnion.Server.Hubs;