diff --git a/samples/JwtAuthentication/JwtAuthApp.Client/Program.cs b/samples/JwtAuthentication/JwtAuthApp.Client/Program.cs index 3133ba225..827c672db 100644 --- a/samples/JwtAuthentication/JwtAuthApp.Client/Program.cs +++ b/samples/JwtAuthentication/JwtAuthApp.Client/Program.cs @@ -63,7 +63,6 @@ private async Task MainCore(string[] args) { "Authorization", "Bearer " + AuthenticationTokenStorage.Current.Token } })); await timerHubClient.SetAsync(TimeSpan.FromSeconds(5)); - await Task.Yield(); // NOTE: Release the gRPC's worker thread here. } // 6. Insufficient privilege (The current user is not in administrators role). diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs index c8677ad05..f33b93b88 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; namespace MagicOnion.Utils @@ -11,6 +11,11 @@ internal interface ITaskCompletion internal class TaskCompletionSourceEx : TaskCompletionSource, ITaskCompletion { + public TaskCompletionSourceEx() + { } + public TaskCompletionSourceEx(TaskCreationOptions options) : base(options) + { } + bool ITaskCompletion.TrySetCanceled() { return this.TrySetCanceled(); @@ -21,4 +26,4 @@ bool ITaskCompletion.TrySetException(Exception ex) return this.TrySetException(ex); } } -} \ No newline at end of file +} diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index f432aab0b..1a27a431d 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -36,7 +36,7 @@ public abstract class StreamingHubClientBase TaskCompletionSource waitForDisconnect = new TaskCompletionSource(); // {messageId, TaskCompletionSource} - ConcurrentDictionary responseFutures = new ConcurrentDictionary(); + ConcurrentDictionary responseFutures = new ConcurrentDictionary(); protected CancellationTokenSource cts = new CancellationTokenSource(); int messageId = 0; bool disposed; @@ -196,8 +196,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) if (arrayLength == 3) { var messageId = messagePackReader.ReadInt32(); - object future; - if (responseFutures.TryRemove(messageId, out future)) + if (responseFutures.TryRemove(messageId, out var future)) { var methodId = messagePackReader.ReadInt32(); try @@ -208,7 +207,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) } catch (Exception ex) { - if (!(future as ITaskCompletion).TrySetException(ex)) + if (!future.TrySetException(ex)) { throw; } @@ -218,8 +217,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) else if (arrayLength == 4) { var messageId = messagePackReader.ReadInt32(); - object future; - if (responseFutures.TryRemove(messageId, out future)) + if (responseFutures.TryRemove(messageId, out var future)) { var statusCode = messagePackReader.ReadInt32(); var detail = messagePackReader.ReadString(); @@ -236,7 +234,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) ex = new RpcException(new Status((StatusCode)statusCode, detail), detail + Environment.NewLine + error); } - (future as ITaskCompletion).TrySetException(ex); + future.TrySetException(ex); } } else @@ -290,8 +288,11 @@ protected async Task WriteMessageWithResponseAsync(); // use Ex - responseFutures[mid] = (object)tcs; + // NOTE: The continuations (user code) should be executed asynchronously. + // This is because the continuation may block the thread, for example, Console.ReadLine(). + // If the thread is blocked, it will no longer return to the message consuming loop. + var tcs = new TaskCompletionSourceEx(TaskCreationOptions.RunContinuationsAsynchronously); + responseFutures[mid] = tcs; byte[] BuildMessage() { diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index f432aab0b..1a27a431d 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -36,7 +36,7 @@ public abstract class StreamingHubClientBase TaskCompletionSource waitForDisconnect = new TaskCompletionSource(); // {messageId, TaskCompletionSource} - ConcurrentDictionary responseFutures = new ConcurrentDictionary(); + ConcurrentDictionary responseFutures = new ConcurrentDictionary(); protected CancellationTokenSource cts = new CancellationTokenSource(); int messageId = 0; bool disposed; @@ -196,8 +196,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) if (arrayLength == 3) { var messageId = messagePackReader.ReadInt32(); - object future; - if (responseFutures.TryRemove(messageId, out future)) + if (responseFutures.TryRemove(messageId, out var future)) { var methodId = messagePackReader.ReadInt32(); try @@ -208,7 +207,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) } catch (Exception ex) { - if (!(future as ITaskCompletion).TrySetException(ex)) + if (!future.TrySetException(ex)) { throw; } @@ -218,8 +217,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) else if (arrayLength == 4) { var messageId = messagePackReader.ReadInt32(); - object future; - if (responseFutures.TryRemove(messageId, out future)) + if (responseFutures.TryRemove(messageId, out var future)) { var statusCode = messagePackReader.ReadInt32(); var detail = messagePackReader.ReadString(); @@ -236,7 +234,7 @@ void ConsumeData(SynchronizationContext syncContext, byte[] data) ex = new RpcException(new Status((StatusCode)statusCode, detail), detail + Environment.NewLine + error); } - (future as ITaskCompletion).TrySetException(ex); + future.TrySetException(ex); } } else @@ -290,8 +288,11 @@ protected async Task WriteMessageWithResponseAsync(); // use Ex - responseFutures[mid] = (object)tcs; + // NOTE: The continuations (user code) should be executed asynchronously. + // This is because the continuation may block the thread, for example, Console.ReadLine(). + // If the thread is blocked, it will no longer return to the message consuming loop. + var tcs = new TaskCompletionSourceEx(TaskCreationOptions.RunContinuationsAsynchronously); + responseFutures[mid] = tcs; byte[] BuildMessage() { diff --git a/src/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs b/src/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs index c8677ad05..f33b93b88 100644 --- a/src/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs +++ b/src/MagicOnion.Shared/Utils/TaskCompletionSourceEx.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Threading.Tasks; namespace MagicOnion.Utils @@ -11,6 +11,11 @@ internal interface ITaskCompletion internal class TaskCompletionSourceEx : TaskCompletionSource, ITaskCompletion { + public TaskCompletionSourceEx() + { } + public TaskCompletionSourceEx(TaskCreationOptions options) : base(options) + { } + bool ITaskCompletion.TrySetCanceled() { return this.TrySetCanceled(); @@ -21,4 +26,4 @@ bool ITaskCompletion.TrySetException(Exception ex) return this.TrySetException(ex); } } -} \ No newline at end of file +} diff --git a/tests/MagicOnion.Integration.Tests/StreamingHubTest.cs b/tests/MagicOnion.Integration.Tests/StreamingHubTest.cs index bb152f888..919ed6dc3 100644 --- a/tests/MagicOnion.Integration.Tests/StreamingHubTest.cs +++ b/tests/MagicOnion.Integration.Tests/StreamingHubTest.cs @@ -427,6 +427,30 @@ public async Task Receiver_RefType_Null(TestStreamingHubClientFactory clientFact receiver.Verify(x => x.Receiver_RefType_Null(default)); } + [Theory] + [MemberData(nameof(EnumerateStreamingHubClientFactory))] + public async Task ContinuationBlocking(TestStreamingHubClientFactory clientFactory) + { + // Arrange + var httpClient = factory.CreateDefaultClient(); + var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions() { HttpClient = httpClient }); + + var receiver = new Mock(); + var client = await clientFactory.CreateAndConnectAsync(channel, receiver.Object); + + // Act + // NOTE: Runs on another thread. + _ = Task.Run(async () => + { + await client.CallReceiver_Delay(500); // The receiver will be called after 500ms. + Thread.Sleep(60 * 1000); // Block the continuation. + }); + await Task.Delay(1000); // Wait for broadcast queue to be consumed. + + // Assert + receiver.Verify(x => x.Receiver_Delay()); + } + } public class StreamingHubTestHub : StreamingHubBase, IStreamingHubTestHub @@ -574,6 +598,17 @@ public Task CallReceiver_RefType_Null() Broadcast(group).Receiver_RefType_Null(default); return Task.CompletedTask; } + + public Task CallReceiver_Delay(int milliseconds) + { + _ = Task.Run(async () => + { + await Task.Delay(milliseconds); + Broadcast(group).Receiver_Delay(); + }); + + return Task.CompletedTask; + } } public interface IStreamingHubTestHubReceiver @@ -583,6 +618,7 @@ public interface IStreamingHubTestHubReceiver void Receiver_Parameter_Many(int arg0, string arg1, bool arg2); void Receiver_RefType(MyStreamingResponse request); void Receiver_RefType_Null(MyStreamingResponse? request); + void Receiver_Delay(); } public interface IStreamingHubTestHub : IStreamingHub @@ -617,4 +653,6 @@ public interface IStreamingHubTestHub : IStreamingHub RefType_Null(MyStreamingRequest? request); Task CallReceiver_RefType(MyStreamingRequest request); Task CallReceiver_RefType_Null(); + + Task CallReceiver_Delay(int milliseconds); } diff --git a/tests/MagicOnion.Integration.Tests/_GeneratedClient.cs b/tests/MagicOnion.Integration.Tests/_GeneratedClient.cs index 1b85a784b..612feca38 100644 --- a/tests/MagicOnion.Integration.Tests/_GeneratedClient.cs +++ b/tests/MagicOnion.Integration.Tests/_GeneratedClient.cs @@ -766,6 +766,8 @@ public StreamingHubTestHubClient(global::Grpc.Core.CallInvoker callInvoker, glob => base.WriteMessageWithResponseAsync(1503747814, request); public global::System.Threading.Tasks.Task CallReceiver_RefType_Null() => base.WriteMessageWithResponseAsync(-1093215042, global::MessagePack.Nil.Default); + public global::System.Threading.Tasks.Task CallReceiver_Delay(global::System.Int32 milliseconds) + => base.WriteMessageWithResponseAsync(1865731236, milliseconds); public global::MagicOnion.Integration.Tests.IStreamingHubTestHub FireAndForget() => new FireAndForgetClient(this); @@ -828,6 +830,8 @@ public FireAndForgetClient(StreamingHubTestHubClient parent) => parent.WriteMessageFireAndForgetAsync(1503747814, request); public global::System.Threading.Tasks.Task CallReceiver_RefType_Null() => parent.WriteMessageFireAndForgetAsync(-1093215042, global::MessagePack.Nil.Default); + public global::System.Threading.Tasks.Task CallReceiver_Delay(global::System.Int32 milliseconds) + => parent.WriteMessageFireAndForgetAsync(1865731236, milliseconds); } @@ -865,6 +869,12 @@ protected override void OnBroadcastEvent(global::System.Int32 methodId, global:: receiver.Receiver_RefType_Null(value); } break; + case -5486432: // Void Receiver_Delay() + { + var value = base.Deserialize(data); + receiver.Receiver_Delay(); + } + break; } } @@ -941,6 +951,9 @@ protected override void OnResponseEvent(global::System.Int32 methodId, global::S case -1093215042: // Task CallReceiver_RefType_Null() base.SetResultForResponse(taskCompletionSource, data); break; + case 1865731236: // Task CallReceiver_Delay(global::System.Int32 milliseconds) + base.SetResultForResponse(taskCompletionSource, data); + break; } }