diff --git a/.vscode/cspell.json b/.vscode/cspell.json index ff2ae4b39d12d..3f3632a5f1dfd 100644 --- a/.vscode/cspell.json +++ b/.vscode/cspell.json @@ -917,7 +917,15 @@ "words": [ "Orgs" ] - } + }, + { + "filename": "**/sdk/webpubsub/**/*.cs", + "words": [ + "protobuf", + "Ackable", + "awps" + ] + } ], "allowCompoundWords": true } diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Azure.Messaging.WebPubSub.Client.sln b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Azure.Messaging.WebPubSub.Client.sln new file mode 100644 index 0000000000000..7ceadccfdaad2 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Azure.Messaging.WebPubSub.Client.sln @@ -0,0 +1,56 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.2.32616.157 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.WebPubSub.Client", "src\Azure.Messaging.WebPubSub.Client.csproj", "{845299CD-399C-4296-9F61-D4E02679363E}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.WebPubSub.Client.Tests", "tests\Azure.Messaging.WebPubSub.Client.Tests.csproj", "{0B6AAD06-F541-4931-A661-16BE3BE24EEF}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{E5726134-6AA7-4AED-A654-53C92F09485A}" + ProjectSection(SolutionItems) = preProject + tests\test.runsettings = tests\test.runsettings + EndProjectSection +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {845299CD-399C-4296-9F61-D4E02679363E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Debug|x64.ActiveCfg = Debug|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Debug|x64.Build.0 = Debug|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Debug|x86.ActiveCfg = Debug|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Debug|x86.Build.0 = Debug|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Release|Any CPU.Build.0 = Release|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Release|x64.ActiveCfg = Release|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Release|x64.Build.0 = Release|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Release|x86.ActiveCfg = Release|Any CPU + {845299CD-399C-4296-9F61-D4E02679363E}.Release|x86.Build.0 = Release|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Debug|x64.ActiveCfg = Debug|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Debug|x64.Build.0 = Debug|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Debug|x86.ActiveCfg = Debug|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Debug|x86.Build.0 = Debug|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Release|Any CPU.Build.0 = Release|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Release|x64.ActiveCfg = Release|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Release|x64.Build.0 = Release|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Release|x86.ActiveCfg = Release|Any CPU + {0B6AAD06-F541-4931-A661-16BE3BE24EEF}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {72FA37A6-5914-405F-9A01-605ED24270D3} + EndGlobalSection +EndGlobal diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/CHANGELOG.md b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/CHANGELOG.md new file mode 100644 index 0000000000000..a41b4906de3b4 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/CHANGELOG.md @@ -0,0 +1,4 @@ +# Release History + +## 1.0.0-beta.1 (Unreleased) +- Initial beta release diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Directory.Build.props b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Directory.Build.props new file mode 100644 index 0000000000000..1a9611bd49242 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/Directory.Build.props @@ -0,0 +1,6 @@ + + + + diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/README.md b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/README.md new file mode 100644 index 0000000000000..c56e1724c527c --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/README.md @@ -0,0 +1,202 @@ +# Azure Web PubSub client library for .NET + +[Azure Web PubSub Service](https://aka.ms/awps/doc) is an Azure-managed service that helps developers easily build web applications with real-time features and publish-subscribe pattern. Any scenario that requires real-time publish-subscribe messaging between server and clients or among clients can use Azure Web PubSub service. Traditional real-time features that often require polling from server or submitting HTTP requests can also use Azure Web PubSub service. + +You can use this library in your client side to manage the WebSocket client connections, as shown in below diagram: + +![overflow](https://user-images.githubusercontent.com/668244/140014067-25a00959-04dc-47e8-ac25-6957bd0a71ce.png) + +Use this library to: + +- Send messages to groups +- Send event to event handlers +- Join and leave groups +- Listen messages from groups and servers + +Details about the terms used here are described in [Key concepts](#key-concepts) section. + +[Source code](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src) | +[API reference documentation](https://aka.ms/awps/sdk/csharp) | +[Product documentation](https://aka.ms/awps/doc) | + +## Getting started + +### Install the package + +Install the client library from [NuGet](https://www.nuget.org/): + +```dotnetcli +dotnet add package Azure.Messaging.WebPubSub.Client +``` + +### Prerequisites + +- An [Azure subscription][azure_sub]. +- An existing Azure Web PubSub service instance. [Create Azure Web PubSub service instance](https://learn.microsoft.com/azure/azure-web-pubsub/howto-develop-create-instance) + +### Authenticate the client + +In order to interact with the service, you'll need to create an instance of the `WebPubSubClient` class. To make this possible, you'll need an access token. You can copy and paste an access token from Azure portal. + +```C# Snippet:WebPubSubClient_Construct +var client = new WebPubSubClient(new Uri("")); +``` + +And in production, you usually get `ClientAccessUri` from a negotiate server. + +```C# Snippet:WebPubSubClient_Construct2 +var client = new WebPubSubClient(new WebPubSubClientCredential(token => +{ + // In common practice, you will have a negotiation server for generating token. Client should fetch token from it. + return FetchClientAccessTokenFromServerAsync(token); +})); +``` + +The `FetchClientAccessTokenFromServerAsync` usually fetch the token from server via Http request. And in the server side, you can use `WebPubSubServiceClient` to generate token and response to the fetch request. + +```C# Snippet:WebPubSubClient_GenerateClientAccessUri +var serviceClient = new WebPubSubServiceClient("<< Connection String >>", "hub"); +return await serviceClient.GetClientAccessUriAsync(); +``` + +## Key concepts + +### Connection + +A connection, also known as a client connection, represents an individual WebSocket connection connected to the Web PubSub service. When successfully connected, the Web PubSub service assigns the connection a unique connection ID. Each `WebPubSubClient` creates its own exclusive connection. + +### Recovery + +If using reliable protocols, a new WebSocket will try to reconnect using the connection ID of the lost connection. If the WebSocket connection is successfully connected, the connection is recovered. And all group contexts will be recovered, and unreceived messages will be resent. + +### Hub + +A hub is a logical concept representing a collection of client connections. Usually, you use one hub for one purpose: for example, a chat hub, or a notification hub. When a client connection is created, it connects to a hub and, and during its lifetime, it is bound to that hub. Different applications can share one Azure Web PubSub service by using different hub names. + +### Group + +A group is a subset of connections to the hub. You can add and remove connections from a group at any time. For example, a chat room can be considered a group. When clients join and leave the room, they are added and removed from the group. A connection can belong to multiple groups, and a group can contain multiple connections. + +### User + +Connections to Web PubSub can belong to one user. A user might have multiple connections, for example when a single user is connected across multiple devices or browser tabs. + +### Message + +When a client is connected, it can send messages to the upstream application, or receive messages from the upstream application, through the WebSocket connection. Also, it can send messages to groups and receive message from joined groups. + +## Examples + +### Send to groups + +```C# Snippet:WebPubSubClient_SendToGroup +// Send message to group "testGroup" +await client.SendToGroupAsync("testGroup", BinaryData.FromString("hello world"), WebPubSubDataType.Text); +``` + +### Send events to event handler + +```C# Snippet:WebPubSubClient_SendEvent +// Send custom event to server +await client.SendEventAsync("testEvent", BinaryData.FromString("hello world"), WebPubSubDataType.Text); +``` + +### Handle the Connected event + +The `Connected` event is called after the client receives connected message. The event will be triggered every reconnection. + +```C# Snippet:WebPubSubClient_Subscribe_Connected +client.Connected += eventArgs => +{ + Console.WriteLine($"Connection {eventArgs.ConnectionId} is connected"); + return Task.CompletedTask; +}; +``` + +### Handle the Disconnected event + +The `Disconnected` event is triggered every time the connection closed and could not be recovered + +```C# Snippet:WebPubSubClient_Subscribe_Disconnected +client.Disconnected += eventArgs => +{ + Console.WriteLine($"Connection is disconnected"); + return Task.CompletedTask; +}; +``` + +### Handle the Stopped event + +The `Stopped` event is triggered when the client is stopped. The client won't try to reconnect after being stopped. This event is typically the result of calling `StopAsync` or disabling the `AutoReconnect` + +```C# Snippet:WebPubSubClient_Subscribe_Stopped +client.Stopped += eventArgs => +{ + Console.WriteLine($"Client is stopped"); + return Task.CompletedTask; +}; +``` + +### Handle the Server message event + +The `ServerMessageReceived` event is triggered when there's a message from server. + +```C# Snippet:WebPubSubClient_Subscribe_ServerMessage +client.ServerMessageReceived += eventArgs => +{ + Console.WriteLine($"Receive message: {eventArgs.Message.Data}"); + return Task.CompletedTask; +}; +``` + +### Handle the Group message event + +The `GroupMessageReceived` event is triggered when there's a message from a group. You must join a group before you can receive messages from it. + +```C# Snippet:WebPubSubClient_Subscribe_GroupMessage +client.GroupMessageReceived += eventArgs => +{ + Console.WriteLine($"Receive group message from {eventArgs.Message.Group}: {eventArgs.Message.Data}"); + return Task.CompletedTask; +}; +``` + +### Handle the restore failure event + +The `RestoreGroupFailed` event is triggered when the `AutoRejoinGroups` is enabled and rejoining a group fails after reconnection. + +```C# Snippet:WebPubSubClient_Subscribe_RestoreFailed +client.RejoinGroupFailed += eventArgs => +{ + Console.WriteLine($"Restore group failed"); + return Task.CompletedTask; +}; +``` + +## Troubleshooting + +### Setting up console logging + +You can also [enable console logging](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/core/Azure.Core/samples/Diagnostics.md#logging) if you want to dig deeper into the requests you're making against the service. + +## Next steps + +You can also find [more samples here][awps_sample]. + +## Contributing + +This project welcomes contributions and suggestions. +Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. +For details, visit + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). +Simply follow the instructions provided by the bot. +You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. + +![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-net%2Fsdk%2Ftemplate%2FAzure.Template%2FREADME.png) + +[azure_sub]: https://azure.microsoft.com/free/dotnet/ +[awps_sample]: https://github.com/Azure/azure-webpubsub/tree/main/samples/csharp diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/api/Azure.Messaging.WebPubSub.Client.netstandard2.1.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/api/Azure.Messaging.WebPubSub.Client.netstandard2.1.cs new file mode 100644 index 0000000000000..8709209158087 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/api/Azure.Messaging.WebPubSub.Client.netstandard2.1.cs @@ -0,0 +1,210 @@ +namespace Azure.Messaging.WebPubSub.Clients +{ + public partial class AckMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public AckMessage(ulong ackId, bool success, Azure.Messaging.WebPubSub.Clients.AckMessageError error) { } + public ulong AckId { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.AckMessageError Error { get { throw null; } } + public bool Success { get { throw null; } } + } + public partial class AckMessageError + { + public AckMessageError(string name, string message) { } + public string Message { get { throw null; } } + public string Name { get { throw null; } } + } + public partial class ConnectedMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public ConnectedMessage(string userId, string connectionId, string reconnectionToken) { } + public string ConnectionId { get { throw null; } } + public string ReconnectionToken { get { throw null; } } + public string UserId { get { throw null; } } + } + public partial class DisconnectedMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public DisconnectedMessage(string reason) { } + public string Reason { get { throw null; } } + } + public partial class GroupDataMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public GroupDataMessage(string group, Azure.Messaging.WebPubSub.Clients.WebPubSubDataType dataType, System.BinaryData data, ulong? sequenceId, string fromUserId) { } + public System.BinaryData Data { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.WebPubSubDataType DataType { get { throw null; } } + public string FromUserId { get { throw null; } } + public string Group { get { throw null; } } + public ulong? SequenceId { get { throw null; } } + } + public partial class JoinGroupMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public JoinGroupMessage(string group, ulong? ackId) { } + public ulong? AckId { get { throw null; } } + public string Group { get { throw null; } } + } + public partial class LeaveGroupMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public LeaveGroupMessage(string group, ulong? ackId) { } + public ulong? AckId { get { throw null; } } + public string Group { get { throw null; } } + } + public partial class SendEventMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public SendEventMessage(string eventName, System.BinaryData data, Azure.Messaging.WebPubSub.Clients.WebPubSubDataType dataType, ulong? ackId) { } + public ulong? AckId { get { throw null; } } + public System.BinaryData Data { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.WebPubSubDataType DataType { get { throw null; } } + public string EventName { get { throw null; } } + } + public partial class SendMessageFailedException : System.Exception + { + internal SendMessageFailedException() { } + public ulong? AckId { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.AckMessageError Error { get { throw null; } } + } + public partial class SendToGroupMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public SendToGroupMessage(string group, System.BinaryData data, Azure.Messaging.WebPubSub.Clients.WebPubSubDataType dataType, ulong? ackId, bool noEcho) { } + public ulong? AckId { get { throw null; } } + public System.BinaryData Data { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.WebPubSubDataType DataType { get { throw null; } } + public string Group { get { throw null; } } + public bool NoEcho { get { throw null; } } + } + public partial class SequenceAckMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public SequenceAckMessage(ulong sequenceId) { } + public ulong SequenceId { get { throw null; } } + } + public partial class ServerDataMessage : Azure.Messaging.WebPubSub.Clients.WebPubSubMessage + { + public ServerDataMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubDataType dataType, System.BinaryData data, ulong? sequenceId) { } + public System.BinaryData Data { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.WebPubSubDataType DataType { get { throw null; } } + public ulong? SequenceId { get { throw null; } } + } + public partial class WebPubSubClient + { + protected WebPubSubClient() { } + public WebPubSubClient(Azure.Messaging.WebPubSub.Clients.WebPubSubClientCredential credential, Azure.Messaging.WebPubSub.Clients.WebPubSubClientOptions options = null) { } + public WebPubSubClient(System.Uri clientAccessUri) { } + public WebPubSubClient(System.Uri clientAccessUri, Azure.Messaging.WebPubSub.Clients.WebPubSubClientOptions options) { } + public string ConnectionId { get { throw null; } } + public event System.Func Connected { add { } remove { } } + public event System.Func Disconnected { add { } remove { } } + public event System.Func GroupMessageReceived { add { } remove { } } + public event System.Func RejoinGroupFailed { add { } remove { } } + public event System.Func ServerMessageReceived { add { } remove { } } + public event System.Func Stopped { add { } remove { } } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } + protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; } + public virtual System.Threading.Tasks.Task JoinGroupAsync(string group, ulong? ackId = default(ulong?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task LeaveGroupAsync(string group, ulong? ackId = default(ulong?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task SendEventAsync(string eventName, System.BinaryData content, Azure.Messaging.WebPubSub.Clients.WebPubSubDataType dataType, ulong? ackId = default(ulong?), bool fireAndForget = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task SendToGroupAsync(string group, System.BinaryData content, Azure.Messaging.WebPubSub.Clients.WebPubSubDataType dataType, ulong? ackId = default(ulong?), bool noEcho = false, bool fireAndForget = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task StartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task StopAsync() { throw null; } + } + public partial class WebPubSubClientCredential + { + public WebPubSubClientCredential(System.Func> clientAccessUriProvider) { } + public WebPubSubClientCredential(System.Uri clientAccessUri) { } + public System.Threading.Tasks.ValueTask GetClientAccessUriAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)) { throw null; } + } + public partial class WebPubSubClientOptions + { + public WebPubSubClientOptions() { } + public bool AutoReconnect { get { throw null; } set { } } + public bool AutoRejoinGroups { get { throw null; } set { } } + public Azure.Core.RetryOptions MessageRetryOptions { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.WebPubSubProtocol Protocol { get { throw null; } set { } } + } + public partial class WebPubSubConnectedEventArgs + { + internal WebPubSubConnectedEventArgs() { } + public System.Threading.CancellationToken CancellationToken { get { throw null; } } + public string ConnectionId { get { throw null; } } + public string UserId { get { throw null; } } + } + public enum WebPubSubDataType + { + Json = 0, + Text = 1, + Binary = 2, + Protobuf = 3, + } + public partial class WebPubSubDisconnectedEventArgs + { + internal WebPubSubDisconnectedEventArgs() { } + public System.Threading.CancellationToken CancellationToken { get { throw null; } } + public string ConnectionId { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.DisconnectedMessage DisconnectedMessage { get { throw null; } } + } + public partial class WebPubSubGroupMessageEventArgs + { + internal WebPubSubGroupMessageEventArgs() { } + public System.Threading.CancellationToken CancellationToken { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.GroupDataMessage Message { get { throw null; } } + } + public partial class WebPubSubJsonProtocol : Azure.Messaging.WebPubSub.Clients.WebPubSubProtocol + { + public WebPubSubJsonProtocol() { } + public override bool IsReliable { get { throw null; } } + public override string Name { get { throw null; } } + public override Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get { throw null; } } + public override System.ReadOnlyMemory GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message) { throw null; } + public override Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence input) { throw null; } + public override void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter output) { } + } + public partial class WebPubSubJsonReliableProtocol : Azure.Messaging.WebPubSub.Clients.WebPubSubProtocol + { + public WebPubSubJsonReliableProtocol() { } + public override bool IsReliable { get { throw null; } } + public override string Name { get { throw null; } } + public override Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get { throw null; } } + public override System.ReadOnlyMemory GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message) { throw null; } + public override Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence input) { throw null; } + public override void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter output) { } + } + public abstract partial class WebPubSubMessage + { + protected WebPubSubMessage() { } + } + public abstract partial class WebPubSubProtocol + { + protected WebPubSubProtocol() { } + public abstract bool IsReliable { get; } + public abstract string Name { get; } + public abstract Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get; } + public abstract System.ReadOnlyMemory GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message); + public abstract Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence input); + public abstract void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter output); + } + public enum WebPubSubProtocolMessageType + { + Text = 0, + Binary = 1, + } + public partial class WebPubSubRejoinGroupFailedEventArgs + { + internal WebPubSubRejoinGroupFailedEventArgs() { } + public System.Threading.CancellationToken CancellationToken { get { throw null; } } + public System.Exception Exception { get { throw null; } } + public string Group { get { throw null; } } + } + public partial class WebPubSubResult + { + internal WebPubSubResult() { } + public ulong? AckId { get { throw null; } } + public bool IsDuplicated { get { throw null; } } + } + public partial class WebPubSubServerMessageEventArgs + { + internal WebPubSubServerMessageEventArgs() { } + public System.Threading.CancellationToken CancellationToken { get { throw null; } } + public Azure.Messaging.WebPubSub.Clients.ServerDataMessage Message { get { throw null; } } + } + public partial class WebPubSubStoppedEventArgs + { + internal WebPubSubStoppedEventArgs() { } + public System.Threading.CancellationToken CancellationToken { get { throw null; } } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/samples/README.md b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/samples/README.md new file mode 100644 index 0000000000000..72ea8c67af023 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/samples/README.md @@ -0,0 +1,13 @@ +--- +page_type: sample +languages: +- csharp +products: +- azure +name: Azure.Messaging.WebPubSub.Client samples for .NET +description: Samples for the Azure.Messaging.WebPubSub.Client client library +--- + +# Web PubSub client SDK samples + +- Create client, publish and subscribe group messages diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/samples/Sample1_HelloWorld.md b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/samples/Sample1_HelloWorld.md new file mode 100644 index 0000000000000..65fb36f0be7a9 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/samples/Sample1_HelloWorld.md @@ -0,0 +1,57 @@ +# Create client, publish and subscribe group messages + +This sample demonstrates the basic usage of Azure Web PubSub service, with the goal of quickly allowing you to publish to groups and subscribe to messages from groups. To accomplish this, the `WebPubSubClient` will be introduced, along with some of the core concepts of Azure Web PubSub service. + +## Create the clients + +To interact with Azure WebPubSub service, a client is needed for each area of functionality. For each hub, a client must be created and copy and paste the `clientAcccessUri` from Azure Portal + +```C# Snippet:WebPubSubClient_Construct +var client = new WebPubSubClient(new Uri("")); +``` + +The `clientAcccessUri` also can be generated by `WebPubSubServiceClient` + +```C# Snippet:WebPubSubClient_Construct2 +var client = new WebPubSubClient(new WebPubSubClientCredential(token => +{ + // In common practice, you will have a negotiation server for generating token. Client should fetch token from it. + return FetchClientAccessTokenFromServerAsync(token); +})); +``` + +The client is responsible for making a connection to the service and efficiently publishing messages or subscribing messages from groups. By default, `WebPubSubClient` uses `json.reliable.webpubsub.azure.v1` subprotocol. + +## Subscribe to messages + +Messages can come from groups or from servers. The client can register an event handler to be notified of messages and system events. The event handler share the same lifetime with the client, so you can subscribe them before you start the client. If the client has a permission to join group, it can join group to subscribe messages from that group. But note joining group operation must be called after the client has been started. + +```C# Snippet:WebPubSubClient_Subscribe_ServerMessage +client.ServerMessageReceived += eventArgs => +{ + Console.WriteLine($"Receive message: {eventArgs.Message.Data}"); + return Task.CompletedTask; +}; +``` + +```C# Snippet:WebPubSubClient_Subscribe_GroupMessage +client.GroupMessageReceived += eventArgs => +{ + Console.WriteLine($"Receive group message from {eventArgs.Message.Group}: {eventArgs.Message.Data}"); + return Task.CompletedTask; +}; +``` + +## Publish messages to group or server + +To publish messages, a `WebPubSubClient` need to be created and started. The client can send event to server or send messages to groups. + +```C# Snippet:WebPubSubClient_SendToGroup +// Send message to group "testGroup" +await client.SendToGroupAsync("testGroup", BinaryData.FromString("hello world"), WebPubSubDataType.Text); +``` + +```C# Snippet:WebPubSubClient_SendEvent +// Send custom event to server +await client.SendEventAsync("testEvent", BinaryData.FromString("hello world"), WebPubSubDataType.Text); +``` diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Azure.Messaging.WebPubSub.Client.csproj b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Azure.Messaging.WebPubSub.Client.csproj new file mode 100644 index 0000000000000..8df258e7b6493 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Azure.Messaging.WebPubSub.Client.csproj @@ -0,0 +1,20 @@ + + + 1.0.0-beta.1 + Azure.Messaging.WebPubSub.Client + Client for Azure WebPubSub + azure;webpubsub.client + $(NoWarn);0067 + true + netstandard2.1 + $(RequiredTargetFrameworks) + + + + + + + + + + diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubConnectedEventArgs.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubConnectedEventArgs.cs new file mode 100644 index 0000000000000..73c902ab89557 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubConnectedEventArgs.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The event args for connected + /// + public class WebPubSubConnectedEventArgs + { + /// + /// The user-id + /// + public string UserId { get; } + + /// + /// The connection ID of the client. The ID is assigned when the client connects. + /// + public string ConnectionId { get; } + + /// + /// Gets a cancellation token related to the original operation that raised the event. + /// + public CancellationToken CancellationToken { get; } + + internal WebPubSubConnectedEventArgs(ConnectedMessage connectedMessage, CancellationToken cancellationToken = default) + { + UserId = connectedMessage.UserId; + ConnectionId = connectedMessage.ConnectionId; + CancellationToken = cancellationToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubDisconnectedEventArgs.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubDisconnectedEventArgs.cs new file mode 100644 index 0000000000000..cef8c74d0bae7 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubDisconnectedEventArgs.cs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The event args for disconnected + /// + public class WebPubSubDisconnectedEventArgs + { + /// + /// The connection ID of disconnected connection. Could be null if the connection hasn't received a connected message. + /// + public string ConnectionId { get; } + + /// + /// The disconnected message + /// + public DisconnectedMessage DisconnectedMessage { get; } + + /// + /// Gets a cancellation token related to the original operation that raised the event. + /// + public CancellationToken CancellationToken { get; } + + internal WebPubSubDisconnectedEventArgs(string connectionId, DisconnectedMessage disconnectedMessage, CancellationToken cancellationToken = default) + { + ConnectionId = connectionId; + DisconnectedMessage = disconnectedMessage; + CancellationToken = cancellationToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubGroupMessageEventArgs.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubGroupMessageEventArgs.cs new file mode 100644 index 0000000000000..193725fa0e3f6 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubGroupMessageEventArgs.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The event args for message from groups + /// + public class WebPubSubGroupMessageEventArgs + { + /// + /// The group data message. + /// + public GroupDataMessage Message { get; } + + /// + /// Gets a cancellation token related to the original operation that raised the event. + /// + public CancellationToken CancellationToken { get; } + + internal WebPubSubGroupMessageEventArgs(GroupDataMessage groupResponseMessage, CancellationToken cancellationToken = default) + { + Message = groupResponseMessage; + CancellationToken = cancellationToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubRejoinGroupFailedEventArgs.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubRejoinGroupFailedEventArgs.cs new file mode 100644 index 0000000000000..6a35c1903fd0c --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubRejoinGroupFailedEventArgs.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The EventArgs of RestoreGroupFailed event. The event is triggered when the `AutoRejoinGroups` is enabled and rejoining a group fails after reconnection. + /// + /// + /// Groups that have joined by client will be restore after reconnection. Groups that joined or leaved from server won't be taken into consideration. + /// E.g. Client A: Join Group A ----------------> Leave Group A ------------> Join Group B ----------------> Reconnect + /// Server: Leave Group B + /// Then rejoin operation will contains Group B. Because client can't recognize the operation from server. + /// + public class WebPubSubRejoinGroupFailedEventArgs + { + /// + /// Gets a cancellation token related to the original operation that raised the event. + /// + public CancellationToken CancellationToken { get; } + + /// + /// The exceptions throws when restore group fails + /// + public Exception Exception { get; } + + /// + /// The group name + /// + public string Group { get; } + + internal WebPubSubRejoinGroupFailedEventArgs(string group, Exception ex, CancellationToken cancellationToken = default) + { + Group = group; + Exception = ex; + CancellationToken = cancellationToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubServerMessageEventArgs.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubServerMessageEventArgs.cs new file mode 100644 index 0000000000000..c94756225d71b --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubServerMessageEventArgs.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The event args for message from server + /// + public class WebPubSubServerMessageEventArgs + { + /// + /// The server data message + /// + public ServerDataMessage Message { get; } + + /// + /// Gets a cancellation token related to the original operation that raised the event. + /// + public CancellationToken CancellationToken { get; } + + internal WebPubSubServerMessageEventArgs(ServerDataMessage message, CancellationToken cancellationToken = default) + { + Message = message; + CancellationToken = cancellationToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubStoppedEventArgs.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubStoppedEventArgs.cs new file mode 100644 index 0000000000000..035c00af1fb0f --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/EventArgs/WebPubSubStoppedEventArgs.cs @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The event args for stopped + /// + public class WebPubSubStoppedEventArgs + { + /// + /// Gets a cancellation token related to the original operation that raised the event. + /// + public CancellationToken CancellationToken { get; } + + internal WebPubSubStoppedEventArgs(CancellationToken cancellationToken = default) + { + CancellationToken = cancellationToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Exceptions/SendMessageFailedException.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Exceptions/SendMessageFailedException.cs new file mode 100644 index 0000000000000..bf2befffb2275 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Exceptions/SendMessageFailedException.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// Exception for sending message failed + /// + public class SendMessageFailedException : Exception + { + /// + /// The ackId + /// + public ulong? AckId { get; } + + /// + /// The error + /// + public AckMessageError Error { get; } + + internal SendMessageFailedException(string message, ulong? ackId, AckMessageError error = null) : base(message) + { + AckId = ackId; + Error = error; + } + + internal SendMessageFailedException(string message, ulong? ackId, Exception innerException) : base(message, innerException) + { + AckId = ackId; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/GlobalSuppressions.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/GlobalSuppressions.cs new file mode 100644 index 0000000000000..cb4f52ab2e07a --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/GlobalSuppressions.cs @@ -0,0 +1,4 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Diagnostics.CodeAnalysis; diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/MemoryBufferWriter.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/MemoryBufferWriter.cs new file mode 100644 index 0000000000000..a2e9a4c99d9c8 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/MemoryBufferWriter.cs @@ -0,0 +1,411 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal sealed class MemoryBufferWriter : Stream, IBufferWriter + { + [ThreadStatic] + private static MemoryBufferWriter _cachedInstance; + +#if DEBUG + private bool _inUse; +#endif + + private readonly int _minimumSegmentSize; + private int _bytesWritten; + + private List _completedSegments; + private byte[] _currentSegment; + private int _position; + + public MemoryBufferWriter(int minimumSegmentSize = 4096) + { + _minimumSegmentSize = minimumSegmentSize; + } + + public override long Length => _bytesWritten; + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + /// + /// Use the thread local cached instance to write data synchronously.
+ /// Please do NOT use await in the using scope.
+ /// For await case, please create a new instance. + ///
+ public static LocalLease GetLocalLease() => new(Get()); + + private static MemoryBufferWriter Get() + { + var writer = _cachedInstance; + if (writer == null) + { + writer = new MemoryBufferWriter(); + } + else + { + // Taken off the thread static + _cachedInstance = null; + } +#if DEBUG + if (writer._inUse) + { + throw new InvalidOperationException("The reader wasn't returned!"); + } + + writer._inUse = true; +#endif + + return writer; + } + + private static void Return(MemoryBufferWriter writer) + { + _cachedInstance = writer; +#if DEBUG + writer._inUse = false; +#endif + writer.Reset(); + } + + public void Reset() + { + if (_completedSegments != null) + { + for (var i = 0; i < _completedSegments.Count; i++) + { + _completedSegments[i].Return(); + } + + _completedSegments.Clear(); + } + + if (_currentSegment != null) + { + ArrayPool.Shared.Return(_currentSegment); + _currentSegment = null; + } + + _bytesWritten = 0; + _position = 0; + } + + public void Advance(int count) + { + _bytesWritten += count; + _position += count; + } + + public Memory GetMemory(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + + return _currentSegment.AsMemory(_position, _currentSegment.Length - _position); + } + + public Span GetSpan(int sizeHint = 0) + { + EnsureCapacity(sizeHint); + + return _currentSegment.AsSpan(_position, _currentSegment.Length - _position); + } + + public void CopyTo(IBufferWriter destination) + { + if (_completedSegments != null) + { + // Copy completed segments + var count = _completedSegments.Count; + for (var i = 0; i < count; i++) + { + destination.Write(_completedSegments[i].Span); + } + } + + destination.Write(_currentSegment.AsSpan(0, _position)); + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + if (_completedSegments == null) + { + // There is only one segment so write without awaiting. + return destination.WriteAsync(_currentSegment, 0, _position, cancellationToken); + } + + return CopyToSlowAsync(destination); + } + + private void EnsureCapacity(int sizeHint) + { + // This does the Right Thing. It only subtracts _position from the current segment length if it's non-null. + // If _currentSegment is null, it returns 0. + var remainingSize = _currentSegment?.Length - _position ?? 0; + + // If the sizeHint is 0, any capacity will do + // Otherwise, the buffer must have enough space for the entire size hint, or we need to add a segment. + if ((sizeHint == 0 && remainingSize > 0) || (sizeHint > 0 && remainingSize >= sizeHint)) + { + // We have capacity in the current segment + return; + } + + AddSegment(sizeHint); + } + + private void AddSegment(int sizeHint = 0) + { + if (_currentSegment != null) + { + // We're adding a segment to the list + if (_completedSegments == null) + { + _completedSegments = new List(); + } + + // Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when + // GetMemory was called. In that case we'll take the current segment and call it "completed", but need to + // ignore any empty space in it. + _completedSegments.Add(new CompletedBuffer(_currentSegment, _position)); + } + + // Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment. + _currentSegment = ArrayPool.Shared.Rent(Math.Max(_minimumSegmentSize, sizeHint)); + _position = 0; + } + + private async Task CopyToSlowAsync(Stream destination) + { + if (_completedSegments != null) + { + // Copy full segments + var count = _completedSegments.Count; + for (var i = 0; i < count; i++) + { + var segment = _completedSegments[i]; + await destination.WriteAsync(segment.Buffer.AsMemory(0, segment.Length), CancellationToken.None).ConfigureAwait(false); + } + } + + await destination.WriteAsync(_currentSegment.AsMemory(0, _position)).ConfigureAwait(false); + } + + public byte[] ToArray() + { + if (_currentSegment == null) + { + return Array.Empty(); + } + + var result = new byte[_bytesWritten]; + + var totalWritten = 0; + + if (_completedSegments != null) + { + // Copy full segments + var count = _completedSegments.Count; + for (var i = 0; i < count; i++) + { + var segment = _completedSegments[i]; + segment.Span.CopyTo(result.AsSpan(totalWritten)); + totalWritten += segment.Span.Length; + } + } + + // Copy current incomplete segment + _currentSegment.AsSpan(0, _position).CopyTo(result.AsSpan(totalWritten)); + + return result; + } + + public void CopyTo(Span span) + { + Debug.Assert(span.Length >= _bytesWritten); + + if (_currentSegment == null) + { + return; + } + + var totalWritten = 0; + + if (_completedSegments != null) + { + // Copy full segments + var count = _completedSegments.Count; + for (var i = 0; i < count; i++) + { + var segment = _completedSegments[i]; + segment.Span.CopyTo(span.Slice(totalWritten)); + totalWritten += segment.Span.Length; + } + } + + // Copy current incomplete segment + _currentSegment.AsSpan(0, _position).CopyTo(span.Slice(totalWritten)); + + Debug.Assert(_bytesWritten == totalWritten + _position); + } + + public override void Flush() { } + public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask; + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void WriteByte(byte value) + { + if (_currentSegment != null && (uint)_position < (uint)_currentSegment.Length) + { + _currentSegment[_position] = value; + } + else + { + AddSegment(); + _currentSegment[0] = value; + } + + _position++; + _bytesWritten++; + } + + public override void Write(byte[] buffer, int offset, int count) + { + var position = _position; + if (_currentSegment != null && position < _currentSegment.Length - count) + { + Buffer.BlockCopy(buffer, offset, _currentSegment, position, count); + + _position = position + count; + _bytesWritten += count; + } + else + { + BuffersExtensions.Write(this, buffer.AsSpan(offset, count)); + } + } + +#if NETCOREAPP + public override void Write(ReadOnlySpan span) + { + if (_currentSegment != null && span.TryCopyTo(_currentSegment.AsSpan(_position))) + { + _position += span.Length; + _bytesWritten += span.Length; + } + else + { + BuffersExtensions.Write(this, span); + } + } +#endif + + public ReadOnlySequence AsReadOnlySequence() + { + if (_bytesWritten == 0) + { + return ReadOnlySequence.Empty; + } + Segment head = null; + Segment current = null; + if (_completedSegments != null) + { + foreach (var seg in _completedSegments) + { + var next = new Segment(seg.Buffer, seg.Length); + if (current != null) + { + current.SetNext(next); + current = next; + } + else + { + current = next; + head = next; + } + } + } + if (_position > 0) + { + var next = new Segment(_currentSegment, _position); + if (current != null) + { + current.SetNext(next); + current = next; + } + else + { + current = next; + head = next; + } + } + return new ReadOnlySequence(head, 0, current, current.Memory.Length); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Reset(); + } + base.Dispose(disposing); + } + + /// + /// Holds a byte[] from the pool and a size value. Basically a Memory but guaranteed to be backed by an ArrayPool byte[], so that we know we can return it. + /// + private readonly struct CompletedBuffer + { + public byte[] Buffer { get; } + public int Length { get; } + + public ReadOnlySpan Span => Buffer.AsSpan(0, Length); + + public CompletedBuffer(byte[] buffer, int length) + { + Buffer = buffer; + Length = length; + } + + public void Return() + { + ArrayPool.Shared.Return(Buffer); + } + } + + private sealed class Segment : ReadOnlySequenceSegment + { + public Segment(byte[] array, int count) => Memory = new ReadOnlyMemory(array, 0, count); + + internal void SetNext(Segment next) + { + Next = next; + next.RunningIndex = RunningIndex + Memory.Length; + } + } + + public struct LocalLease : IDisposable + { + public LocalLease(MemoryBufferWriter writer) => Writer = writer; + + public MemoryBufferWriter Writer { get; } + + public void Dispose() => MemoryBufferWriter.Return(Writer); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/ReusableUtf8JsonWriter.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/ReusableUtf8JsonWriter.cs new file mode 100644 index 0000000000000..9c3ceb4967eb2 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/ReusableUtf8JsonWriter.cs @@ -0,0 +1,78 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +#nullable disable + +using System; +using System.Buffers; +using System.Text.Encodings.Web; +using System.Text.Json; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal sealed class ReusableUtf8JsonWriter : IDisposable + { + [ThreadStatic] + private static ReusableUtf8JsonWriter _cachedInstance; + + private readonly Utf8JsonWriter _writer; + +#if DEBUG + private bool _inUse; +#endif + + public ReusableUtf8JsonWriter(IBufferWriter stream) + { + _writer = new Utf8JsonWriter(stream, new JsonWriterOptions() + { +#if !DEBUG + SkipValidation = true, +#endif + Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping + }); + } + + public static ReusableUtf8JsonWriter Get(IBufferWriter stream) + { + var writer = _cachedInstance; + if (writer == null) + { + writer = new ReusableUtf8JsonWriter(stream); + } + + // Taken off the thread static + _cachedInstance = null; +#if DEBUG + if (writer._inUse) + { + throw new InvalidOperationException("The writer wasn't returned!"); + } + + writer._inUse = true; +#endif + writer._writer.Reset(stream); + return writer; + } + + public static void Return(ReusableUtf8JsonWriter writer) + { + _cachedInstance = writer; + + writer._writer.Reset(); + +#if DEBUG + writer._inUse = false; +#endif + } + + public void Dispose() + { + _writer?.Dispose(); + } + + public Utf8JsonWriter GetJsonWriter() + { + return _writer; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/SystemTextJsonExtension.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/SystemTextJsonExtension.cs new file mode 100644 index 0000000000000..66c387e17eebe --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/SystemTextJsonExtension.cs @@ -0,0 +1,123 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Text.Json; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal static class SystemTextJsonExtension + { + public static bool CheckRead(this ref Utf8JsonReader reader) + { + if (!reader.Read()) + { + throw new InvalidDataException("Unexpected end when reading JSON."); + } + + return true; + } + + public static void EnsureObjectStart(this ref Utf8JsonReader reader) + { + if (reader.TokenType != JsonTokenType.StartObject) + { + throw new InvalidDataException($"Unexpected JSON Token Type '{reader.GetTokenString()}'. Expected a JSON Object."); + } + } + + public static string GetTokenString(this ref Utf8JsonReader reader) + { + return GetTokenString(reader.TokenType); + } + + public static string GetTokenString(JsonTokenType tokenType) + { + switch (tokenType) + { + case JsonTokenType.None: + break; + case JsonTokenType.StartObject: + return "Object"; + case JsonTokenType.StartArray: + return "Array"; + case JsonTokenType.PropertyName: + return "Property"; + default: + break; + } + return tokenType.ToString(); + } + + public static void EnsureArrayStart(this ref Utf8JsonReader reader) + { + if (reader.TokenType != JsonTokenType.StartArray) + { + throw new InvalidDataException($"Unexpected JSON Token Type '{reader.GetTokenString()}'. Expected a JSON Array."); + } + } + + public static bool ReadAsBoolean(this ref Utf8JsonReader reader, string propertyName) + { + reader.Read(); + + return reader.TokenType switch + { + JsonTokenType.False => false, + JsonTokenType.True => true, + _ => throw new InvalidDataException($"Expected '{propertyName}' to be true or false."), + }; + } + + public static string ReadAsNullableString(this ref Utf8JsonReader reader, string propertyName) + { + reader.Read(); + + if (reader.TokenType == JsonTokenType.Null) + { + return null; + } + + if (reader.TokenType != JsonTokenType.String) + { + throw new InvalidDataException($"Expected '{propertyName}' to be of type {JsonTokenType.String}."); + } + + return reader.GetString(); + } + + public static int? ReadAsInt32(this ref Utf8JsonReader reader, string propertyName) + { + reader.Read(); + + if (reader.TokenType == JsonTokenType.Null) + { + return null; + } + + if (reader.TokenType != JsonTokenType.Number) + { + throw new InvalidDataException($"Expected '{propertyName}' to be of type {JsonTokenType.Number}."); + } + + return reader.GetInt32(); + } + + public static ulong? ReadAsUlong(this ref Utf8JsonReader reader, string propertyName) + { + reader.Read(); + if (reader.TokenType == JsonTokenType.Null) + { + return null; + } + if (reader.TokenType != JsonTokenType.Number) + { + throw new InvalidDataException($"Expected '{propertyName}' to be of type {JsonTokenType.Number}."); + } + return reader.GetUInt64(); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/Utils.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/Utils.cs new file mode 100644 index 0000000000000..0e13749668147 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Internals/Utils.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; +using Azure.Core; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal static class Utils + { + internal static RetryOptions GetRetryOptions() + { + return (RetryOptions)typeof(RetryOptions).GetConstructor( + BindingFlags.NonPublic | BindingFlags.Instance, + null, Type.EmptyTypes, null).Invoke(null); + } + + internal static void FireAndForget(this Task task) + { + if (task == null) + { + return; + } + _ = Task.Run(() => task); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/AckMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/AckMessage.cs new file mode 100644 index 0000000000000..bcae281bb1a0d --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/AckMessage.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing Ack. + /// + public class AckMessage : WebPubSubMessage + { + /// + /// The ack-id + /// + public ulong AckId { get; } + + /// + /// Representing whether the operation is success. + /// + public bool Success { get; } + + /// + /// The error detail when the operation is not success. + /// + public AckMessageError Error { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The ack-id + /// Representing whether the operation is success. + /// The error detail when the operation is not success. + public AckMessage(ulong ackId, bool success, AckMessageError error) + { + AckId = ackId; + Success = success; + Error = error; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/AckMessageError.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/AckMessageError.cs new file mode 100644 index 0000000000000..b0c257e464d07 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/AckMessageError.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// An error associated with a service acknowledgement. + /// + public class AckMessageError + { + /// + /// The name of error + /// + public string Name { get; } + + /// + /// The details of the error + /// + public string Message { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The name of error + /// The detailed message + public AckMessageError(string name, string message) + { + Name = name; + Message = message; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/ConnectedMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/ConnectedMessage.cs new file mode 100644 index 0000000000000..7cb8e4592adb2 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/ConnectedMessage.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing the client is connected + /// + public class ConnectedMessage : WebPubSubMessage + { + /// + /// The user-id + /// + public string UserId { get; } + + /// + /// The connection ID of the client. The ID is assigned when the client connects. + /// + public string ConnectionId { get; } + + /// + /// The reconnection token for recovering the connection. Only availble in reliable subprotocol. + /// + public string ReconnectionToken { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The user-id + /// The connection ID of the client + /// The reconnection token for recovering the connection. Only availble in reliable subprotocol. + public ConnectedMessage(string userId, string connectionId, string reconnectionToken) + { + UserId = userId; + ConnectionId = connectionId; + ReconnectionToken = reconnectionToken; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/DisconnectedMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/DisconnectedMessage.cs new file mode 100644 index 0000000000000..2a5749876de9d --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/DisconnectedMessage.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing a client disconnection + /// + public class DisconnectedMessage : WebPubSubMessage + { + /// + /// The reason for being disconnected. + /// + public string Reason { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The reason of getting disconnected. + public DisconnectedMessage(string reason) + { + Reason = reason; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/GroupDataMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/GroupDataMessage.cs new file mode 100644 index 0000000000000..180c96e48f62b --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/GroupDataMessage.cs @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing the data from groups. + /// + public class GroupDataMessage : WebPubSubMessage + { + /// + /// Type of the data + /// + public WebPubSubDataType DataType { get; } + + /// + /// The data content + /// + public BinaryData Data { get; } + + /// + /// The sequence id. Only availble in reliable protocol. + /// + public ulong? SequenceId { get; } + + /// + /// The group name + /// + public string Group { get; } + + /// + /// FromUserId + /// + public string FromUserId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The group name + /// Type of the data + /// The data content + /// The sequence id. Only availble in reliable protocol. + /// fromUserId. + public GroupDataMessage(string group, WebPubSubDataType dataType, BinaryData data, ulong? sequenceId, string fromUserId) + { + DataType = dataType; + Data = data; + SequenceId = sequenceId; + Group = group; + FromUserId = fromUserId; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/JoinGroupMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/JoinGroupMessage.cs new file mode 100644 index 0000000000000..6ae80ba711036 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/JoinGroupMessage.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing joining group. + /// + public class JoinGroupMessage : WebPubSubMessage + { + /// + /// The group name. + /// + public string Group { get; } + + /// + /// The optional ack-id + /// + public ulong? AckId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The group name. + /// The optional ack-id + public JoinGroupMessage(string group, ulong? ackId) + { + Group = group; + AckId = ackId; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/LeaveGroupMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/LeaveGroupMessage.cs new file mode 100644 index 0000000000000..febde1461ac84 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/LeaveGroupMessage.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing leaving group. + /// + public class LeaveGroupMessage : WebPubSubMessage + { + /// + /// The group name. + /// + public string Group { get; } + + /// + /// The optional ack-id + /// + public ulong? AckId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The group name. + /// The optional ack-id + public LeaveGroupMessage(string group, ulong? ackId) + { + Group = group; + AckId = ackId; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SendEventMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SendEventMessage.cs new file mode 100644 index 0000000000000..0eaf366094701 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SendEventMessage.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using Azure.Core; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing sending event to server. + /// + public class SendEventMessage : WebPubSubMessage + { + /// + /// The optional ack-id + /// + public ulong? AckId { get; } + + /// + /// Type of the data + /// + public WebPubSubDataType DataType { get; } + + /// + /// The data content + /// + public BinaryData Data { get; } + + /// + /// The name of custom event + /// + public string EventName { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The event name + /// The data content + /// Type of the data + /// The optional ack-id + public SendEventMessage(string eventName, BinaryData data, WebPubSubDataType dataType, ulong? ackId) + { + EventName = eventName; + AckId = ackId; + DataType = dataType; + Data = data; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SendToGroupMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SendToGroupMessage.cs new file mode 100644 index 0000000000000..58290a1d50d22 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SendToGroupMessage.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using Azure.Core; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing sending message to group. + /// + public class SendToGroupMessage : WebPubSubMessage + { + /// + /// The group name + /// + public string Group { get; } + + /// + /// The optional ack-id + /// + public ulong? AckId { get; } + + /// + /// Optional. If set to true, this message is not echoed back to the same connection. + /// + public bool NoEcho { get; } + + /// + /// Type of the data + /// + public WebPubSubDataType DataType { get; } + + /// + /// The data content + /// + public BinaryData Data { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The group name + /// The data content + /// Type of the data + /// The optional ack-id + /// Optional. If set to true, this message is not echoed back to the same connection. + public SendToGroupMessage(string group, BinaryData data, WebPubSubDataType dataType, ulong? ackId, bool noEcho) + { + Group = group; + AckId = ackId; + NoEcho = noEcho; + DataType = dataType; + Data = data; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SequenceAckMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SequenceAckMessage.cs new file mode 100644 index 0000000000000..9118f358e894b --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/SequenceAckMessage.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing SequenceAck. + /// + public class SequenceAckMessage : WebPubSubMessage + { + /// + /// The sequenceId + /// + public ulong SequenceId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The sequenceId + public SequenceAckMessage(ulong sequenceId) + { + SequenceId = sequenceId; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/ServerDataMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/ServerDataMessage.cs new file mode 100644 index 0000000000000..29c8d88dbe39f --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/ServerDataMessage.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The message representing the message from server. + /// + public class ServerDataMessage : WebPubSubMessage + { + /// + /// Type of the data + /// + public WebPubSubDataType DataType { get; } + + /// + /// The data content + /// + public BinaryData Data { get; } + + /// + /// The sequence id. Only availble in reliable protocol. + /// + public ulong? SequenceId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// Type of the data + /// The data content + /// The sequence id. Only availble in reliable protocol. + public ServerDataMessage(WebPubSubDataType dataType, BinaryData data, ulong? sequenceId) + { + DataType = dataType; + Data = data; + SequenceId = sequenceId; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/WebPubSubMessage.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/WebPubSubMessage.cs new file mode 100644 index 0000000000000..e5f28174a2feb --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Messages/WebPubSubMessage.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// Base class representing WebPubSub messages. + /// + public abstract class WebPubSubMessage + { + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Models/WebPubSubClientOptions.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Models/WebPubSubClientOptions.cs new file mode 100644 index 0000000000000..e15bca43ff343 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Models/WebPubSubClientOptions.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using Azure.Core; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The set of options that can be specified when creating a instance. + /// + public class WebPubSubClientOptions + { + /// + /// Get or set the protocol to use. + /// + public WebPubSubProtocol Protocol { get; set; } = new WebPubSubJsonReliableProtocol(); + + /// + /// Get or set whether to auto reconnect. If enabled, the client tries to reconnect after connection drop and unrecovered. + /// + public bool AutoReconnect { get; set; } = true; + + /// + /// Get or set whether to auto rejoin groups after reconnecting. Groups that have joined by client will be rejoined after reconnection + /// + public bool AutoRejoinGroups { get; set; } = true; + + /// + /// Get or set the retry options for message-related client operations, including + /// , + /// , + /// , + /// . + /// + public RetryOptions MessageRetryOptions { get; } + + /// + /// Get or set the retry options for client connection management. + /// + internal RetryOptions ReconnectRetryOptions { get; } + + /// + /// Construct a WebPubSubClientOptions + /// + public WebPubSubClientOptions() + { + MessageRetryOptions = Utils.GetRetryOptions(); + + ReconnectRetryOptions = Utils.GetRetryOptions(); + ReconnectRetryOptions.MaxRetries = int.MaxValue; + ReconnectRetryOptions.Delay = TimeSpan.FromSeconds(1); + ReconnectRetryOptions.MaxDelay = TimeSpan.FromSeconds(5); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Models/WebPubSubDataType.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Models/WebPubSubDataType.cs new file mode 100644 index 0000000000000..c8ba5a92fca62 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Models/WebPubSubDataType.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// Represent the type of the data. + /// + public enum WebPubSubDataType + { + /// + /// The data content is json. + /// + Json = 0, + /// + /// The data content is plain text. + /// + Text = 1, + /// + /// The data content is binary. + /// + Binary = 2, + /// + /// The data content is protobuf. + /// + Protobuf = 3, + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Properties/AssemblyInfo.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000000..8cf3fd2b99bd8 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Properties/AssemblyInfo.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo( + "DynamicProxyGenAssembly2, PublicKey=" + + "00240000048000009400000006020000002400005253413100" + + "04000001000100c547cac37abd99c8db225ef2f6c8a3602f3b" + + "3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9" + + "266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191" + + "195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d1560509" + + "3924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7" + + "d3113e92484cf7045cc7")] +[assembly: InternalsVisibleTo("Azure.Messaging.WebPubSub.Client.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100d15ddcb29688295338af4b7686603fe614abd555e09efba8fb88ee09e1f7b1ccaeed2e8f823fa9eef3fdd60217fc012ea67d2479751a0b8c087a4185541b851bd8b16f8d91b840e51b1cb0ba6fe647997e57429265e85ef62d565db50a69ae1647d54d7bd855e4db3d8a91510e5bcbd0edfbbecaa20a7bd9ae74593daa7b11b4")] diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocol.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocol.cs new file mode 100644 index 0000000000000..e8dcab7c3dbf3 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocol.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The protocol to represent "json.webpubsub.azure.v1" + /// + public class WebPubSubJsonProtocol : WebPubSubProtocol + { + private readonly WebPubSubJsonProtocolBase _processor = new WebPubSubJsonProtocolBase(); + + /// + /// Gets the name of the protocol. The name is used by Web PubSub client to resolve the protocol between the client and server. + /// + public override string Name => "json.webpubsub.azure.v1"; + + /// + /// Get whether the protocol using a reliable subprotocol. + /// + public override bool IsReliable => false; + + /// + /// Get the message type to be used in websocket bytes + /// + public override WebPubSubProtocolMessageType WebSocketMessageType => WebPubSubProtocolMessageType.Text; + + /// + /// Converts the specified to its serialized representation. + /// + /// The message to convert. + /// The serialized representation of the message. + public override ReadOnlyMemory GetMessageBytes(WebPubSubMessage message) + { + return _processor.GetMessageBytes(message); + } + + /// + /// Creates a new from the specified serialized representation。 + /// + /// The serialized representation of the message. + /// A + public override WebPubSubMessage ParseMessage(ReadOnlySequence input) + { + return _processor.ParseMessage(input); + } + + /// + /// Writes the specified to a writer. + /// + /// The message to write. + /// The output writer. + public override void WriteMessage(WebPubSubMessage message, IBufferWriter output) + { + _processor.WriteMessage(message, output); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs new file mode 100644 index 0000000000000..d4a186aba037c --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal class WebPubSubJsonProtocolBase + { + public ReadOnlyMemory GetMessageBytes(WebPubSubMessage message) + { + throw new NotImplementedException(); + } + + public virtual WebPubSubMessage ParseMessage(ReadOnlySequence input) + { + throw new NotImplementedException(); + } + + public virtual void WriteMessage(WebPubSubMessage message, IBufferWriter output) + { + throw new NotImplementedException(); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonReliableProtocol.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonReliableProtocol.cs new file mode 100644 index 0000000000000..019d6e6a8fcc2 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonReliableProtocol.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The protocol to represent "json.reliable.webpubsub.azure.v1" + /// + public class WebPubSubJsonReliableProtocol : WebPubSubProtocol + { + private readonly WebPubSubJsonProtocolBase _processor = new WebPubSubJsonProtocolBase(); + + /// + /// Gets the name of the protocol. The name is used by Web PubSub client to resolve the protocol between the client and server. + /// + public override string Name => "json.reliable.webpubsub.azure.v1"; + + /// + /// Get whether the protocol using a reliable subprotocol. + /// + public override bool IsReliable => true; + + /// + /// Get the message type to be used in websocket bytes + /// + public override WebPubSubProtocolMessageType WebSocketMessageType => WebPubSubProtocolMessageType.Text; + + /// + /// Converts the specified to its serialized representation. + /// + /// The message to convert. + /// The serialized representation of the message. + public override ReadOnlyMemory GetMessageBytes(WebPubSubMessage message) + { + return _processor.GetMessageBytes(message); + } + + /// + /// Creates a new from the specified serialized representation。 + /// + /// The serialized representation of the message. + /// A + public override WebPubSubMessage ParseMessage(ReadOnlySequence input) + { + return _processor.ParseMessage(input); + } + + /// + /// Writes the specified to a writer. + /// + /// The message to write. + /// The output writer. + public override void WriteMessage(WebPubSubMessage message, IBufferWriter output) + { + _processor.WriteMessage(message, output); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubProtocol.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubProtocol.cs new file mode 100644 index 0000000000000..ee27741d6701b --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubProtocol.cs @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The WebPubSub client protocol + /// + public abstract class WebPubSubProtocol + { + /// + /// Gets the name of the protocol. The name is used by Web PubSub client to resolve the protocol between the client and server. + /// + public abstract string Name { get; } + + /// + /// Get the message type to be used in websocket bytes + /// + public abstract WebPubSubProtocolMessageType WebSocketMessageType { get; } + + /// + /// Creates a new from the specified serialized representation。 + /// + /// The serialized representation of the message. + /// A + public abstract WebPubSubMessage ParseMessage(ReadOnlySequence input); + + /// + /// Writes the specified to a writer. + /// + /// The message to write. + /// The output writer. + public abstract void WriteMessage(WebPubSubMessage message, IBufferWriter output); + + /// + /// Converts the specified to its serialized representation. + /// + /// The message to convert. + /// The serialized representation of the message. + public abstract ReadOnlyMemory GetMessageBytes(WebPubSubMessage message); + + /// + /// Get whether the protocol using a reliable subprotocol. + /// + public abstract bool IsReliable { get; } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubProtocolMessageType.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubProtocolMessageType.cs new file mode 100644 index 0000000000000..be4baa4722229 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubProtocolMessageType.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// It's a map to , indicating the message type. + /// + public enum WebPubSubProtocolMessageType + { + /// + /// The message is in text format. + /// + Text = 0, + /// + /// The message is in binary format. + /// + Binary = 1, + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubClient.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubClient.cs new file mode 100644 index 0000000000000..bdd4027eee2f3 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubClient.cs @@ -0,0 +1,196 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Buffers; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Net.WebSockets; +using System.Runtime.ExceptionServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using System.Web; +using Azure.Core; +using Azure.Core.Pipeline; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The WebPubSubService PubSub client. + /// + [SuppressMessage("Usage", "AZC0007:DO provide a minimal constructor that takes only the parameters required to connect to the service.", Justification = "WebPubSub clients are Websocket based and don't use ClientOptions functionality")] + [SuppressMessage("Usage", "AZC0004:DO provide both asynchronous and synchronous variants for all service methods.", Justification = "Synchronous methods doesn't make sense in the scenario of WebPubSub client")] + [SuppressMessage("Usage", "AZC0015:Unexpected client method return type.", Justification = "WebPubSubClient is not a HTTP-based client.")] + [SuppressMessage("Design", "CA1001:Types that own disposable fields should be disposable", Justification = "We don't want user to use client within a using block")] + public class WebPubSubClient + { + /// + /// The connection ID of the client. The ID is assigned when the client connects. + /// + public string ConnectionId { get; } + + /// + /// Initializes a Web PubSub client. + /// + /// The uri to connect to the service. + public WebPubSubClient(Uri clientAccessUri) : this(clientAccessUri, null) + { + } + + /// + /// Initializes a Web PubSub client. + /// + /// The uri to connect to the service. + /// A option for the client. + public WebPubSubClient(Uri clientAccessUri, WebPubSubClientOptions options) : this(new WebPubSubClientCredential(clientAccessUri), options) + { + } + + /// + /// Initializes a Web PubSub client. + /// + /// A uri provider that will be called to return the uri for each connecting or reconnecting. + /// A option for the client. + public WebPubSubClient(WebPubSubClientCredential credential, WebPubSubClientOptions options = null) + { + throw new NotImplementedException(); + } + + /// + /// Constructor for mock. + /// + protected WebPubSubClient() + { + } + + /// + /// Start connecting to the service. + /// + /// An optional instance to signal the request to cancel the operation. + /// + public virtual Task StartAsync(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + /// + /// Stop the client. + /// + /// +#pragma warning disable AZC0002 // DO ensure all service methods, both asynchronous and synchronous, take an optional CancellationToken parameter called cancellationToken. + public virtual Task StopAsync() +#pragma warning restore AZC0002 // DO ensure all service methods, both asynchronous and synchronous, take an optional CancellationToken parameter called cancellationToken. + { + throw new NotImplementedException(); + } + + /// + /// Stop and close the client to the service + /// + /// +#pragma warning disable AZC0003 // DO make service methods virtual. +#pragma warning disable AZC0002 // DO ensure all service methods, both asynchronous and synchronous, take an optional CancellationToken parameter called cancellationToken. + public ValueTask DisposeAsync() +#pragma warning restore AZC0002 // DO ensure all service methods, both asynchronous and synchronous, take an optional CancellationToken parameter called cancellationToken. +#pragma warning restore AZC0003 // DO make service methods virtual. + { + throw new NotImplementedException(); + } + + /// + /// Stop and close the client to the service + /// + protected virtual ValueTask DisposeAsyncCore() + { + throw new NotImplementedException(); + } + + /// + /// Join the target group. + /// + /// The group name. + /// An optional ack id. It's generated by SDK if not assigned. + /// An optional instance to signal the request to cancel the operation. + /// The ack for the operation. + public virtual Task JoinGroupAsync(string group, ulong? ackId = null, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + /// + /// Leave the target group. + /// + /// The group name. + /// An optional ack id. It's generated by SDK if not assigned. + /// An optional instance to signal the request to cancel the operation. + /// The ack for the operation + public virtual Task LeaveGroupAsync(string group, ulong? ackId = null, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + /// + /// Publish data to group and wait for the ack. + /// + /// The group name. + /// The data content. + /// The data type. + /// If set to true, this message is not echoed back to the same connection. If not set, the default value is false. + /// If set to true, the service won't return ack for this message. The return value will be Task of null + /// The ack-id for the operation. The message with the same ack-id is treated as the same message. Leave it omitted to generate by library. + /// An optional instance to signal the request to cancel the operation. + /// The ack for the operation + public virtual Task SendToGroupAsync(string group, BinaryData content, WebPubSubDataType dataType, ulong? ackId = null, bool noEcho = false, bool fireAndForget = false, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + /// + /// Send custom event and wait for the ack. + /// + /// The event name. + /// The data content. + /// The data type. + /// The ack-id for the operation. The message with the same ack-id is treated as the same message. Leave it omitted to generate by library. + /// If set to true, the service won't return ack for this message. The return value will be Task of null + /// An optional instance to signal the request to cancel the operation. + /// The ack for the operation + public virtual Task SendEventAsync(string eventName, BinaryData content, WebPubSubDataType dataType, ulong? ackId = null, bool fireAndForget = false, CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + + /// + /// An event triggered when the connection is connected + /// + public event Func Connected; + + /// + /// An event triggered when the connection is disconnected + /// + public event Func Disconnected; + + /// + /// An event triggered when the connection is stopped + /// + public event Func Stopped; + + /// + /// A event triggered when received server data messages. + /// + public event Func ServerMessageReceived; + + /// + /// A event triggered when received group data messages. + /// + public event Func GroupMessageReceived; + + /// + /// A event triggered when rejoin group failed in reconnection. + /// + public event Func RejoinGroupFailed; + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubClientCredential.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubClientCredential.cs new file mode 100644 index 0000000000000..f2051d3d9b7e9 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubClientCredential.cs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// The WebPubSubClientCredential + /// + public class WebPubSubClientCredential + { + private readonly Func> _clientAccessUriProvider; + + /// + /// Initialize a WebPubSubClientCredential instance + /// + /// The uri to be used to connect to the service + public WebPubSubClientCredential(Uri clientAccessUri): this(_ => new ValueTask(clientAccessUri)) + { + } + + /// + /// Initialize a WebPubSubClientCredential instance + /// + /// The uri to be used to connect to the service + public WebPubSubClientCredential(Func> clientAccessUriProvider) + { + _clientAccessUriProvider = clientAccessUriProvider; + } + + /// + /// GetClientAccessUri + /// + /// The cancellation token used to cancel the operation. + /// + public ValueTask GetClientAccessUriAsync(CancellationToken token = default) + { + return _clientAccessUriProvider.Invoke(token); + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubResult.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubResult.cs new file mode 100644 index 0000000000000..171caec21eead --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebPubSubResult.cs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + /// + /// To represent the result of ack-able operations. + /// + public class WebPubSubResult + { + /// + /// The ack id of message just sent. If the operation is fire-and-forget, it will be null. + /// + public ulong? AckId { get; } + + /// + /// Whether the message is duplicated, if true, the message with the ack id has been processed by service + /// + public bool IsDuplicated { get; } + + internal WebPubSubResult(): this(null, false) + { + } + + internal WebPubSubResult(ulong? ackId, bool isDuplicated) + { + AckId = ackId; + IsDuplicated = isDuplicated; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/IWebSocketClient.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/IWebSocketClient.cs new file mode 100644 index 0000000000000..e610f2e0be40f --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/IWebSocketClient.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Text; +using System.Threading.Tasks; +using System.Threading; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal interface IWebSocketClient : IDisposable + { + Task ConnectAsync(CancellationToken token); + + Task SendAsync(ReadOnlyMemory buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken); + + Task ReceiveOneFrameAsync(CancellationToken token); + + Task StopAsync(CancellationToken token); + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/IWebSocketClientFactory.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/IWebSocketClientFactory.cs new file mode 100644 index 0000000000000..d939153dc9e5f --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/IWebSocketClientFactory.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal interface IWebSocketClientFactory + { + IWebSocketClient CreateWebSocketClient(Uri uri, string protocol); + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/WebSocketReadResult.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/WebSocketReadResult.cs new file mode 100644 index 0000000000000..5e12950ce14dd --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/WebSocketClient/WebSocketReadResult.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Buffers; +using System.Net.WebSockets; + +namespace Azure.Messaging.WebPubSub.Clients +{ + internal struct WebSocketReadResult + { + public bool IsClosed { get; } + + public ReadOnlySequence Payload { get; } + + public WebSocketCloseStatus? CloseStatus { get; } + + public WebSocketReadResult(ReadOnlySequence payload, bool isClosed = false, WebSocketCloseStatus? closeStatus = null) + { + Payload = payload; + IsClosed = isClosed; + CloseStatus = closeStatus; + } + } +} diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Azure.Messaging.WebPubSub.Client.Tests.csproj b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Azure.Messaging.WebPubSub.Client.Tests.csproj new file mode 100644 index 0000000000000..a7535e5b68385 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Azure.Messaging.WebPubSub.Client.Tests.csproj @@ -0,0 +1,18 @@ + + + $(RequiredTargetFrameworks) + $(MSBuildProjectDirectory)\test.runsettings + + + + + + + + + + + + + + diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Samples/WebPubSubClientSample.HelloWorld.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Samples/WebPubSubClientSample.HelloWorld.cs new file mode 100644 index 0000000000000..c81df9cc2eb04 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Samples/WebPubSubClientSample.HelloWorld.cs @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +#if NETCOREAPP3_1_OR_GREATER || SNIPPET +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.WebPubSub.Clients; +using Xunit; + +namespace Azure.Messaging.WebPubSub.Client.Tests +{ + public class WebPubSubClientSample + { + public void ConstructWebPubSubClient() + { +#region Snippet:WebPubSubClient_Construct + var client = new WebPubSubClient(new Uri("")); +#endregion + } + + public void ConstructWebPubSubClient2() + { +#region Snippet:WebPubSubClient_Construct2 + var client = new WebPubSubClient(new WebPubSubClientCredential(token => + { + // In common practice, you will have a negotiation server for generating token. Client should fetch token from it. + return FetchClientAccessTokenFromServerAsync(token); + })); +#endregion + } + + public async ValueTask FetchClientAccessTokenFromServerAsync(CancellationToken token) + { +#region Snippet:WebPubSubClient_GenerateClientAccessUri + var serviceClient = new WebPubSubServiceClient("<< Connection String >>", "hub"); + return await serviceClient.GetClientAccessUriAsync(); +#endregion + } + + public void WebPubSubClientSubscribeConnected(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_Subscribe_Connected + client.Connected += eventArgs => + { + Console.WriteLine($"Connection {eventArgs.ConnectionId} is connected"); + return Task.CompletedTask; + }; +#endregion + } + + public void WebPubSubClientSubscribeDisconnected(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_Subscribe_Disconnected + client.Disconnected += eventArgs => + { + Console.WriteLine($"Connection is disconnected"); + return Task.CompletedTask; + }; +#endregion + } + + public void WebPubSubClientSubscribeStopped(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_Subscribe_Stopped + client.Stopped += eventArgs => + { + Console.WriteLine($"Client is stopped"); + return Task.CompletedTask; + }; +#endregion + } + + public void WebPubSubClientSubscribeServerMessage(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_Subscribe_ServerMessage + client.ServerMessageReceived += eventArgs => + { + Console.WriteLine($"Receive message: {eventArgs.Message.Data}"); + return Task.CompletedTask; + }; +#endregion + } + + public void WebPubSubClientSubscribeGroupMessage(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_Subscribe_GroupMessage + client.GroupMessageReceived += eventArgs => + { + Console.WriteLine($"Receive group message from {eventArgs.Message.Group}: {eventArgs.Message.Data}"); + return Task.CompletedTask; + }; +#endregion + } + + public void WebPubSubClientSubscribeRestoreFailed(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_Subscribe_RestoreFailed + client.RejoinGroupFailed += eventArgs => + { + Console.WriteLine($"Restore group failed"); + return Task.CompletedTask; + }; +#endregion + } + + public async Task WebPubSubClientSendToGroup(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_SendToGroup + // Send message to group "testGroup" + await client.SendToGroupAsync("testGroup", BinaryData.FromString("hello world"), WebPubSubDataType.Text); +#endregion + } + + public async Task WebPubSubClientSendEvent(WebPubSubClient client) + { +#region Snippet:WebPubSubClient_SendEvent + // Send custom event to server + await client.SendEventAsync("testEvent", BinaryData.FromString("hello world"), WebPubSubDataType.Text); +#endregion + } + } +} +#endif diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/test.runsettings b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/test.runsettings new file mode 100644 index 0000000000000..1e4c7048b5715 --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/test.runsettings @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/sdk/webpubsub/ci.yml b/sdk/webpubsub/ci.yml index 74bbd1171ada2..6cf012ac86d0c 100644 --- a/sdk/webpubsub/ci.yml +++ b/sdk/webpubsub/ci.yml @@ -32,6 +32,8 @@ extends: Artifacts: - name: Azure.Messaging.WebPubSub safeName: AzureMessagingWebPubSub + - name: Azure.Messaging.WebPubSub.Client + safeName: AzureMessagingWebPubSub.Client - name: Microsoft.Azure.WebJobs.Extensions.WebPubSub safeName: MicrosoftAzureWebJobsExtensionsWebPubSub - name: Microsoft.Azure.WebPubSub.AspNetCore