From 4cc768c3468fa375ef5295c9827b48ce2449271f Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Thu, 28 Sep 2023 17:26:45 +0900 Subject: [PATCH] Refactor StreamingHub --- src/MagicOnion.Server/Hubs/StreamingHub.cs | 148 ++++++++------------- 1 file changed, 59 insertions(+), 89 deletions(-) diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 027da8eac..eae10908c 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -158,113 +158,83 @@ async Task HandleMessageAsync() // Be careful to allocation and performance. while (await reader.MoveNext(ct)) // must keep SyncContext. { - (int methodId, int messageId, int offset) FetchHeader(byte[] msgData) - { - var messagePackReader = new MessagePackReader(msgData); - - var length = messagePackReader.ReadArrayHeader(); - if (length == 2) - { - // void: [methodId, [argument]] - var mid = messagePackReader.ReadInt32(); - var consumed = (int)messagePackReader.Consumed; - - return (mid, -1, consumed); - } - else if (length == 3) - { - // T: [messageId, methodId, [argument]] - var msgId = messagePackReader.ReadInt32(); - var metId = messagePackReader.ReadInt32(); - var consumed = (int)messagePackReader.Consumed; - return (metId, msgId, consumed); - } - else - { - throw new InvalidOperationException("Invalid data format."); - } - } - var data = reader.Current; var (methodId, messageId, offset) = FetchHeader(data); + var hasResponse = messageId != -1; - if (messageId == -1) + if (handlers.TryGetValue(methodId, out var handler)) { - if (handlers.TryGetValue(methodId, out var handler)) + // Create a context for each call to the hub method. + var context = new StreamingHubContext() { - var context = new StreamingHubContext() // create per invoke. - { - HubInstance = this, - ServiceContext = (IStreamingServiceContext)Context, - Request = data.AsMemory(offset, data.Length - offset), - Path = handler.ToString(), - MethodId = handler.MethodId, - MessageId = -1, - Timestamp = DateTime.UtcNow - }; - - var isErrorOrInterrupted = false; - Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType); - try - { - await handler.MethodBody.Invoke(context); - } - catch (Exception ex) - { - isErrorOrInterrupted = true; - Context.MethodHandler.Logger.Error(ex, context); - } - finally - { - Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted); - } - } - else + HubInstance = this, + ServiceContext = (IStreamingServiceContext)Context, + Request = data.AsMemory(offset, data.Length - offset), + Path = handler.ToString(), + MethodId = handler.MethodId, + MessageId = messageId, + Timestamp = DateTime.UtcNow + }; + + var isErrorOrInterrupted = false; + Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType); + try { - throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId); + await handler.MethodBody.Invoke(context); } - } - else - { - if (handlers.TryGetValue(methodId, out var handler)) + catch (ReturnStatusException ex) { - var context = new StreamingHubContext() // create per invoke. - { - HubInstance = this, - ServiceContext = (IStreamingServiceContext)Context, - Request = data.AsMemory(offset, data.Length - offset), - Path = handler.ToString(), - MethodId = handler.MethodId, - MessageId = messageId, - Timestamp = DateTime.UtcNow - }; - - var isErrorOrInterrupted = false; - Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType); - try - { - await handler.MethodBody.Invoke(context); - } - catch (ReturnStatusException ex) + if (hasResponse) { await context.WriteErrorMessage((int)ex.StatusCode, ex.Detail, null, false); } - catch (Exception ex) + } + catch (Exception ex) + { + isErrorOrInterrupted = true; + Context.MethodHandler.Logger.Error(ex, context); + + if (hasResponse) { - isErrorOrInterrupted = true; - Context.MethodHandler.Logger.Error(ex, context); await context.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{handler.ToString()}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail); } - finally - { - Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted); - } } - else + finally { - throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId); + Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted); } } + else + { + throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId); + } + } + } + + static (int methodId, int messageId, int offset) FetchHeader(byte[] msgData) + { + var messagePackReader = new MessagePackReader(msgData); + + var length = messagePackReader.ReadArrayHeader(); + if (length == 2) + { + // void: [methodId, [argument]] + var mid = messagePackReader.ReadInt32(); + var consumed = (int)messagePackReader.Consumed; + + return (mid, -1, consumed); + } + else if (length == 3) + { + // T: [messageId, methodId, [argument]] + var msgId = messagePackReader.ReadInt32(); + var metId = messagePackReader.ReadInt32(); + var consumed = (int)messagePackReader.Consumed; + return (metId, msgId, consumed); + } + else + { + throw new InvalidOperationException("Invalid data format."); } }