Skip to content

Commit

Permalink
Merge pull request #682 from Cysharp/feature/RefactorStreamingHub
Browse files Browse the repository at this point in the history
Refactor StreamingHub
  • Loading branch information
mayuki authored Sep 28, 2023
2 parents dfaded1 + 4cc768c commit 7988cc8
Showing 1 changed file with 59 additions and 89 deletions.
148 changes: 59 additions & 89 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,113 +158,83 @@ async Task HandleMessageAsync()
// Be careful to allocation and performance.
while (await reader.MoveNext(ct)) // must keep SyncContext.
{
(int methodId, int messageId, int offset) FetchHeader(byte[] msgData)
{
var messagePackReader = new MessagePackReader(msgData);

var length = messagePackReader.ReadArrayHeader();
if (length == 2)
{
// void: [methodId, [argument]]
var mid = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;

return (mid, -1, consumed);
}
else if (length == 3)
{
// T: [messageId, methodId, [argument]]
var msgId = messagePackReader.ReadInt32();
var metId = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;
return (metId, msgId, consumed);
}
else
{
throw new InvalidOperationException("Invalid data format.");
}
}

var data = reader.Current;
var (methodId, messageId, offset) = FetchHeader(data);
var hasResponse = messageId != -1;

if (messageId == -1)
if (handlers.TryGetValue(methodId, out var handler))
{
if (handlers.TryGetValue(methodId, out var handler))
// Create a context for each call to the hub method.
var context = new StreamingHubContext()
{
var context = new StreamingHubContext() // create per invoke.
{
HubInstance = this,
ServiceContext = (IStreamingServiceContext<byte[], byte[]>)Context,
Request = data.AsMemory(offset, data.Length - offset),
Path = handler.ToString(),
MethodId = handler.MethodId,
MessageId = -1,
Timestamp = DateTime.UtcNow
};

var isErrorOrInterrupted = false;
Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType);
try
{
await handler.MethodBody.Invoke(context);
}
catch (Exception ex)
{
isErrorOrInterrupted = true;
Context.MethodHandler.Logger.Error(ex, context);
}
finally
{
Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted);
}
}
else
HubInstance = this,
ServiceContext = (IStreamingServiceContext<byte[], byte[]>)Context,
Request = data.AsMemory(offset, data.Length - offset),
Path = handler.ToString(),
MethodId = handler.MethodId,
MessageId = messageId,
Timestamp = DateTime.UtcNow
};

var isErrorOrInterrupted = false;
Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType);
try
{
throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId);
await handler.MethodBody.Invoke(context);
}
}
else
{
if (handlers.TryGetValue(methodId, out var handler))
catch (ReturnStatusException ex)
{
var context = new StreamingHubContext() // create per invoke.
{
HubInstance = this,
ServiceContext = (IStreamingServiceContext<byte[], byte[]>)Context,
Request = data.AsMemory(offset, data.Length - offset),
Path = handler.ToString(),
MethodId = handler.MethodId,
MessageId = messageId,
Timestamp = DateTime.UtcNow
};

var isErrorOrInterrupted = false;
Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType);
try
{
await handler.MethodBody.Invoke(context);
}
catch (ReturnStatusException ex)
if (hasResponse)
{
await context.WriteErrorMessage((int)ex.StatusCode, ex.Detail, null, false);
}
catch (Exception ex)
}
catch (Exception ex)
{
isErrorOrInterrupted = true;
Context.MethodHandler.Logger.Error(ex, context);

if (hasResponse)
{
isErrorOrInterrupted = true;
Context.MethodHandler.Logger.Error(ex, context);
await context.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{handler.ToString()}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail);
}
finally
{
Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted);
}
}
else
finally
{
throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId);
Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted);
}
}
else
{
throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId);
}
}
}

static (int methodId, int messageId, int offset) FetchHeader(byte[] msgData)
{
var messagePackReader = new MessagePackReader(msgData);

var length = messagePackReader.ReadArrayHeader();
if (length == 2)
{
// void: [methodId, [argument]]
var mid = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;

return (mid, -1, consumed);
}
else if (length == 3)
{
// T: [messageId, methodId, [argument]]
var msgId = messagePackReader.ReadInt32();
var metId = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;
return (metId, msgId, consumed);
}
else
{
throw new InvalidOperationException("Invalid data format.");
}
}

Expand Down

0 comments on commit 7988cc8

Please sign in to comment.