Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamingHub OnConnected support #745

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ protected virtual ValueTask OnConnecting()
return CompletedTask;
}

/// <summary>
/// Called after connect (headers and marker have been sent).
/// Allow the server send message to the client or broadcast to group.
/// </summary>
protected virtual ValueTask OnConnected()
{
return CompletedTask;
}

/// <summary>
/// Called after disconnect.
/// </summary>
Expand Down Expand Up @@ -158,6 +167,11 @@ async Task HandleMessageAsync()
// NOTE: To prevent buffering by AWS ALB or reverse-proxy.
await writer.WriteAsync(MarkerResponseBytes);

// Call OnConnected after sending the headers and marker.
// The server can send messages or broadcast to client after OnConnected.
// eg: Send the current game state to the client.
await OnConnected();

var handlers = StreamingHubHandlerRepository.GetHandlers(Context.MethodHandler);

// Main loop of StreamingHub.
Expand Down
26 changes: 24 additions & 2 deletions tests/MagicOnion.Server.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace MagicOnion.Server.Tests;

public interface IMessageReceiver
{
void VoidOnConnected(int x, string y, double z);
//Task ZeroArgument();
//Task OneArgument(int x);
//Task MoreArgument(int x, string y, double z);
Expand Down Expand Up @@ -60,6 +61,11 @@ protected override async ValueTask OnConnecting()
group = await Group.AddAsync("global");
}

protected override async ValueTask OnConnected()
{
BroadcastToSelf(group).VoidOnConnected(123, "foo", 12.3f);
}

protected override async ValueTask OnDisconnected()
{
if (group != null) await group.RemoveAsync(Context);
Expand Down Expand Up @@ -134,6 +140,15 @@ public BasicStreamingHubTest(ITestOutputHelper logger, ServerFixture<TestHub> se
this.channel = server.DefaultChannel;
}

[Fact]
public async Task OnConnected()
{
client = await StreamingHubClient.ConnectAsync<ITestHub, IMessageReceiver>(channel, this);
var x = await voidOnConnectedTask.Task;
x.Should().Be((123, "foo", 12.3f));
await client.DisposeAsync();
}

[Fact]
public async Task ZeroArgument()
{
Expand Down Expand Up @@ -327,6 +342,13 @@ public async Task RetrunOneArgument3()
// one3Task.TrySetResult(x);
//}

TaskCompletionSource<(int, string, double)> voidOnConnectedTask = new TaskCompletionSource<(int, string, double)>();
void IMessageReceiver.VoidOnConnected(int x, string y, double z)
{
voidOnConnectedTask.TrySetResult((x, y, z));
}


TaskCompletionSource<(int, string, double)> voidmoreTask = new TaskCompletionSource<(int, string, double)>();
void IMessageReceiver.VoidMoreArgument(int x, string y, double z)
{
Expand Down Expand Up @@ -399,7 +421,7 @@ public async Task StatusCodeAsync()
[StreamingHubTestFilter]
public async Task FilterCheckAsync()
{

}
}

Expand All @@ -415,7 +437,7 @@ public override async ValueTask Invoke(StreamingHubContext context, Func<Streami
context.Items["HubFilter1_BF"] = "AfterOK";
}
}

public class MoreCheckHubTest : IEmptyReceiver, IDisposable, IClassFixture<ServerFixture<MoreCheckHub>>
{
ITestOutputHelper logger;
Expand Down
Loading