diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs index d4a186aba037c..13b97dc4dfcab 100644 --- a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/src/Protocols/WebPubSubJsonProtocolBase.cs @@ -14,19 +14,453 @@ namespace Azure.Messaging.WebPubSub.Clients { internal class WebPubSubJsonProtocolBase { + private const string TypePropertyName = "type"; + private static readonly JsonEncodedText TypePropertyNameBytes = JsonEncodedText.Encode(TypePropertyName); + private const string GroupPropertyName = "group"; + private static readonly JsonEncodedText GroupPropertyNameBytes = JsonEncodedText.Encode(GroupPropertyName); + private const string AckIdPropertyName = "ackId"; + private static readonly JsonEncodedText AckIdPropertyNameBytes = JsonEncodedText.Encode(AckIdPropertyName); + private const string DataTypePropertyName = "dataType"; + private static readonly JsonEncodedText DataTypePropertyNameBytes = JsonEncodedText.Encode(DataTypePropertyName); + private const string DataPropertyName = "data"; + private static readonly JsonEncodedText DataPropertyNameBytes = JsonEncodedText.Encode(DataPropertyName); + private const string EventPropertyName = "event"; + private static readonly JsonEncodedText EventPropertyNameBytes = JsonEncodedText.Encode(EventPropertyName); + private const string NoEchoPropertyName = "noEcho"; + private static readonly JsonEncodedText NoEchoPropertyNameBytes = JsonEncodedText.Encode(NoEchoPropertyName); + + private const string SuccessPropertyName = "success"; + private static readonly JsonEncodedText SuccessPropertyNameBytes = JsonEncodedText.Encode(SuccessPropertyName); + private const string MessagePropertyName = "message"; + private static readonly JsonEncodedText MessagePropertyNameBytes = JsonEncodedText.Encode(MessagePropertyName); + private const string ErrorPropertyName = "error"; + private static readonly JsonEncodedText ErrorPropertyNameBytes = JsonEncodedText.Encode(ErrorPropertyName); + private const string ErrorNamePropertyName = "name"; + private static readonly JsonEncodedText ErrorNamePropertyNameBytes = JsonEncodedText.Encode(ErrorNamePropertyName); + private const string FromPropertyName = "from"; + private static readonly JsonEncodedText FromPropertyNameBytes = JsonEncodedText.Encode(FromPropertyName); + private const string FromUserIdPropertyName = "fromUserId"; + private static readonly JsonEncodedText FromUserIdPropertyNameBytes = JsonEncodedText.Encode(FromUserIdPropertyName); + private const string UserIdPropertyName = "userId"; + private static readonly JsonEncodedText UserIdPropertyNameBytes = JsonEncodedText.Encode(UserIdPropertyName); + private const string ConnectionIdPropertyName = "connectionId"; + private static readonly JsonEncodedText ConnectionIdPropertyNameBytes = JsonEncodedText.Encode(ConnectionIdPropertyName); + private const string ReconnectionTokenPropertyName = "reconnectionToken"; + private static readonly JsonEncodedText ReconnectionTokenPropertyNameBytes = JsonEncodedText.Encode(ReconnectionTokenPropertyName); + private const string SequenceIdPropertyName = "sequenceId"; + private static readonly JsonEncodedText SequenceIdPropertyNameBytes = JsonEncodedText.Encode(SequenceIdPropertyName); + + private static readonly JsonEncodedText JoinGroupTypeBytes = JsonEncodedText.Encode("joinGroup"); + private static readonly JsonEncodedText LeaveGroupTypeBytes = JsonEncodedText.Encode("leaveGroup"); + private static readonly JsonEncodedText SendToGroupTypeBytes = JsonEncodedText.Encode("sendToGroup"); + private static readonly JsonEncodedText SendEventTypeBytes = JsonEncodedText.Encode("event"); + private static readonly JsonEncodedText SequenceAckTypeBytes = JsonEncodedText.Encode("sequenceAck"); + + private const byte Quote = (byte)'"'; + private const byte KeyValueSeperator = (byte)':'; + private const byte ListSeparator = (byte)','; + public ReadOnlyMemory GetMessageBytes(WebPubSubMessage message) { - throw new NotImplementedException(); + using var writer = new MemoryBufferWriter(); + WriteMessage(message, writer); + return new Memory(writer.ToArray()); } public virtual WebPubSubMessage ParseMessage(ReadOnlySequence input) { - throw new NotImplementedException(); + try + { + string type = null; + DownstreamEventType eventType = DownstreamEventType.Ack; + string group = null; + string @event = null; + SystemEventType systemEventType = SystemEventType.Connected; + ulong? ackId = null; + ulong? sequenceId = null; + bool? success = null; + string from = null; + FromType fromType = FromType.Server; + AckMessageError errorDetail = null; + WebPubSubDataType dataType = WebPubSubDataType.Text; + string userId = null; + string connectionId = null; + string reconnectionToken = null; + string message = null; + string fromUserId = null; + + var completed = false; + bool hasDataToken = false; + BinaryData data = null; + SequencePosition dataStart = default; + Utf8JsonReader dataReader = default; + + var reader = new Utf8JsonReader(input, isFinalBlock: true, state: default); + + reader.CheckRead(); + + // We're always parsing a JSON object + reader.EnsureObjectStart(); + + do + { + switch (reader.TokenType) + { + case JsonTokenType.PropertyName: + if (reader.ValueTextEquals(TypePropertyNameBytes.EncodedUtf8Bytes)) + { + type = reader.ReadAsNullableString(TypePropertyName); + if (type == null) + { + throw new InvalidDataException($"Expected '{TypePropertyName}' to be of type {JsonTokenType.String}."); + } + if (!Enum.TryParse(type, true, out eventType)) + { + throw new InvalidDataException($"Unknown '{TypePropertyName}': {type}."); + } + } + else if (reader.ValueTextEquals(GroupPropertyNameBytes.EncodedUtf8Bytes)) + { + group = reader.ReadAsNullableString(GroupPropertyName); + } + else if (reader.ValueTextEquals(EventPropertyNameBytes.EncodedUtf8Bytes)) + { + @event = reader.ReadAsNullableString(EventPropertyName); + if (!Enum.TryParse(@event, true, out systemEventType)) + { + throw new InvalidDataException($"Unknown '{EventPropertyName}': {@event}."); + } + } + else if (reader.ValueTextEquals(DataTypePropertyNameBytes.EncodedUtf8Bytes)) + { + var dataTypeValue = reader.ReadAsNullableString(DataTypePropertyName); + if (!Enum.TryParse(dataTypeValue, true, out dataType)) + { + throw new InvalidDataException($"Unknown '{DataTypePropertyName}': {dataTypeValue}."); + } + } + else if (reader.ValueTextEquals(AckIdPropertyNameBytes.EncodedUtf8Bytes)) + { + try + { + ackId = reader.ReadAsUlong(AckIdPropertyName); + } + catch (FormatException) + { + throw new InvalidDataException($"'{AckIdPropertyName}' is not a valid uint64 value."); + } + } + else if (reader.ValueTextEquals(DataPropertyNameBytes.EncodedUtf8Bytes)) + { + hasDataToken = true; + dataStart = reader.Position; + reader.Skip(); + dataReader = reader; + } + else if (reader.ValueTextEquals(SequenceIdPropertyNameBytes.EncodedUtf8Bytes)) + { + try + { + sequenceId = reader.ReadAsUlong(SequenceIdPropertyName); + } + catch (FormatException) + { + throw new InvalidDataException($"'{SequenceIdPropertyName}' is not a valid uint64 value."); + } + } + else if (reader.ValueTextEquals(SuccessPropertyNameBytes.EncodedUtf8Bytes)) + { + success = reader.ReadAsBoolean(SuccessPropertyName); + } + else if (reader.ValueTextEquals(ErrorPropertyNameBytes.EncodedUtf8Bytes)) + { + errorDetail = ReadErrorDetail(reader); + } + else if (reader.ValueTextEquals(FromPropertyNameBytes.EncodedUtf8Bytes)) + { + from = reader.ReadAsNullableString(FromPropertyName); + if (!Enum.TryParse(from, true, out fromType)) + { + throw new InvalidDataException($"Unknown '{FromPropertyName}': {from}."); + } + } + else if (reader.ValueTextEquals(UserIdPropertyNameBytes.EncodedUtf8Bytes)) + { + userId = reader.ReadAsNullableString(UserIdPropertyName); + } + else if (reader.ValueTextEquals(ConnectionIdPropertyNameBytes.EncodedUtf8Bytes)) + { + connectionId = reader.ReadAsNullableString(ConnectionIdPropertyName); + } + else if (reader.ValueTextEquals(ReconnectionTokenPropertyNameBytes.EncodedUtf8Bytes)) + { + reconnectionToken = reader.ReadAsNullableString(ReconnectionTokenPropertyName); + } + else if (reader.ValueTextEquals(MessagePropertyNameBytes.EncodedUtf8Bytes)) + { + message = reader.ReadAsNullableString(MessagePropertyName); + } + else if (reader.ValueTextEquals(FromUserIdPropertyNameBytes.EncodedUtf8Bytes)) + { + fromUserId = reader.ReadAsNullableString(FromUserIdPropertyName); + } + else + { + reader.CheckRead(); + reader.Skip(); + } + break; + case JsonTokenType.EndObject: + completed = true; + break; + } + } + while (!completed && reader.CheckRead()); + + if (type == null) + { + throw new InvalidDataException($"Missing required property '{TypePropertyName}'."); + } + + if (hasDataToken) + { + if (dataType == WebPubSubDataType.Binary || + dataType == WebPubSubDataType.Protobuf || + dataType == WebPubSubDataType.Text) + { + if (dataReader.TokenType != JsonTokenType.String) + { + throw new InvalidDataException($"'data' should be a string when 'dataType' is 'binary,text,protobuf'."); + } + + if (dataType == WebPubSubDataType.Binary || + dataType == WebPubSubDataType.Protobuf) + { + if (!dataReader.TryGetBytesFromBase64(out var bytes)) + { + throw new InvalidDataException($"'data' is not a valid base64 encoded string."); + } + data = new BinaryData(bytes); + } + else + { + data = new BinaryData(dataReader.GetString()); + } + } + else if (dataType == WebPubSubDataType.Json) + { + if (dataReader.TokenType == JsonTokenType.Null) + { + throw new InvalidDataException($"Invalid value for '{DataPropertyName}': null."); + } + + var end = dataReader.Position; + data = new BinaryData(input.Slice(dataStart, end).ToArray()); + } + } + + switch (eventType) + { + case DownstreamEventType.Ack: + AssertNotNull(ackId, AckIdPropertyName); + AssertNotNull(success, SuccessPropertyName); + return new AckMessage(ackId.Value, success.Value, errorDetail); + + case DownstreamEventType.Message: + AssertNotNull(from, FromPropertyName); + AssertNotNull(dataType, FromPropertyName); + AssertNotNull(data, DataPropertyName); + switch (fromType) + { + case FromType.Server: + return new ServerDataMessage(dataType, data, sequenceId); + case FromType.Group: + AssertNotNull(group, GroupPropertyName); + return new GroupDataMessage(group, dataType, data, sequenceId, fromUserId); + default: + throw new InvalidDataException($"Unsupported from {fromType}"); + } + + case DownstreamEventType.System: + AssertNotNull(@event, EventPropertyName); + + switch (systemEventType) + { + case SystemEventType.Connected: + return new ConnectedMessage(userId, connectionId, reconnectionToken); + case SystemEventType.Disconnected: + return new DisconnectedMessage(message); + default: + throw new InvalidDataException($"Unsupported event {systemEventType}"); + } + + default: + throw new InvalidDataException($"Unsupported type {eventType}"); + } + } + catch (JsonException ex) + { + throw new InvalidDataException("Error reading JSON.", ex); + } } public virtual void WriteMessage(WebPubSubMessage message, IBufferWriter output) { - throw new NotImplementedException(); + if (message == null) + { + throw new ArgumentNullException(nameof(message)); + } + + var jsonWriterLease = ReusableUtf8JsonWriter.Get(output); + + try + { + var writer = jsonWriterLease.GetJsonWriter(); + + writer.WriteStartObject(); + + switch (message) + { + case JoinGroupMessage joinGroupMessage: + writer.WriteString(TypePropertyNameBytes, JoinGroupTypeBytes); + writer.WriteString(GroupPropertyNameBytes, joinGroupMessage.Group); + if (joinGroupMessage.AckId != null) + { + writer.WriteNumber(AckIdPropertyNameBytes, joinGroupMessage.AckId.Value); + } + break; + case LeaveGroupMessage leaveGroupMessage: + writer.WriteString(TypePropertyNameBytes, LeaveGroupTypeBytes); + writer.WriteString(GroupPropertyNameBytes, leaveGroupMessage.Group); + if (leaveGroupMessage.AckId != null) + { + writer.WriteNumber(AckIdPropertyNameBytes, leaveGroupMessage.AckId.Value); + } + break; + case SendToGroupMessage sendToGroupMessage: + writer.WriteString(TypePropertyNameBytes, SendToGroupTypeBytes); + writer.WriteString(GroupPropertyNameBytes, sendToGroupMessage.Group); + if (sendToGroupMessage.AckId != null) + { + writer.WriteNumber(AckIdPropertyNameBytes, sendToGroupMessage.AckId.Value); + } + writer.WriteBoolean(NoEchoPropertyNameBytes, sendToGroupMessage.NoEcho); + writer.WriteString(DataTypePropertyNameBytes, sendToGroupMessage.DataType.ToString()); + WriteData(output, writer, sendToGroupMessage.Data, sendToGroupMessage.DataType); + break; + case SendEventMessage sendEventMessage: + writer.WriteString(TypePropertyNameBytes, SendEventTypeBytes); + writer.WriteString(EventPropertyNameBytes, sendEventMessage.EventName); + if (sendEventMessage.AckId != null) + { + writer.WriteNumber(AckIdPropertyNameBytes, sendEventMessage.AckId.Value); + } + writer.WriteString(DataTypePropertyNameBytes, sendEventMessage.DataType.ToString()); + WriteData(output, writer, sendEventMessage.Data, sendEventMessage.DataType); + break; + case SequenceAckMessage sequenceAckMessage: + writer.WriteString(TypePropertyNameBytes, SequenceAckTypeBytes); + writer.WriteNumber(SequenceIdPropertyNameBytes, sequenceAckMessage.SequenceId); + break; + default: + throw new InvalidDataException($"{message.GetType()} is not supported."); + } + + writer.WriteEndObject(); + writer.Flush(); + } + finally + { + ReusableUtf8JsonWriter.Return(jsonWriterLease); + } + } + + private static AckMessageError ReadErrorDetail(Utf8JsonReader reader) + { + string errorName = null; + string errorMessage = null; + + var completed = false; + reader.CheckRead(); + // Error detail should start with object + reader.EnsureObjectStart(); + do + { + switch (reader.TokenType) + { + case JsonTokenType.PropertyName: + if (reader.ValueTextEquals(ErrorNamePropertyNameBytes.EncodedUtf8Bytes)) + { + errorName = reader.ReadAsNullableString(ErrorNamePropertyName); + } + else if (reader.ValueTextEquals(MessagePropertyNameBytes.EncodedUtf8Bytes)) + { + errorMessage = reader.ReadAsNullableString(MessagePropertyName); + } + break; + case JsonTokenType.EndObject: + completed = true; + break; + } + } + while (!completed && reader.CheckRead()); + + return new AckMessageError(errorName, errorMessage); + } + + private static void AssertNotNull(T value, string propertyName) + { + if (value == null) + { + throw new InvalidDataException($"Missing required property '{propertyName}'."); + } + } + + private static void WriteData(IBufferWriter buffer, Utf8JsonWriter writer, BinaryData data, WebPubSubDataType dataType) + { + switch (dataType) + { + case WebPubSubDataType.Text: + writer.WriteString(DataPropertyNameBytes, data); + break; + case WebPubSubDataType.Json: + writer.Flush(); + var length = DataPropertyNameBytes.EncodedUtf8Bytes.Length + 4; // ListSeparator + Quota + DataPropertyNameBytes + Quota + KeyValueSeperator + var span = buffer.GetSpan(length); + span[0] = ListSeparator; + span[1] = Quote; + DataPropertyNameBytes.EncodedUtf8Bytes.CopyTo(span[2..]); + span[length - 2] = Quote; + span[length - 1] = KeyValueSeperator; + buffer.Advance(length); + buffer.Write(data.ToMemory().Span); + break; + case WebPubSubDataType.Binary: + case WebPubSubDataType.Protobuf: + writer.WriteBase64String(DataPropertyNameBytes, data); + break; + default: + throw new InvalidDataException($"{dataType} is not a supported DataType"); + } + } + + private enum DownstreamEventType + { + Ack, + Message, + System, + } + + private enum FromType + { + Server, + Group, + } + + private enum SystemEventType + { + Connected, + Disconnected, } } } diff --git a/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Protocols/JsonProtocolTests.cs b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Protocols/JsonProtocolTests.cs new file mode 100644 index 0000000000000..caee8770c64af --- /dev/null +++ b/sdk/webpubsub/Azure.Messaging.WebPubSub.Client/tests/Protocols/JsonProtocolTests.cs @@ -0,0 +1,180 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +#if NETCOREAPP3_1_OR_GREATER +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using Azure.Messaging.WebPubSub.Clients; +using Xunit; +using Xunit.Sdk; + +namespace Azure.Messaging.WebPubSub.Client.Tests.Protocols +{ + public class JsonProtocolTests + { + private class JsonData + { + public string Value { get; set; } + } + + public static IEnumerable GetParsingTestData() + { + static object[] GetData(object jsonPayload, Action assert) + { + var converter = JsonSerializer.Serialize(jsonPayload); + return new object[] { Encoding.UTF8.GetBytes(converter), assert }; + } + + yield return GetData(new { type="ack", ackId = 123, success=true }, message => + { + Assert.True(message is AckMessage); + var ackMessage = message as AckMessage; + Assert.Equal(123u, ackMessage.AckId); + Assert.True(ackMessage.Success); + Assert.Null(ackMessage.Error); + }); + yield return GetData(new { type = "ack", ackId = 123, success = false, error = new { name = "Forbidden", message = "message"} }, message => + { + Assert.True(message is AckMessage); + var ackMessage = message as AckMessage; + Assert.Equal(123u, ackMessage.AckId); + Assert.False(ackMessage.Success); + Assert.Equal("Forbidden", ackMessage.Error.Name); + Assert.Equal("message", ackMessage.Error.Message); + }); + yield return GetData(new { sequenceId = 738476327894u, type = "message", from = "group", group = "groupname", dataType = "text", data = "xyz", fromUserId = "user" }, message => + { + Assert.True(message is GroupDataMessage); + var groupDataMessage = message as GroupDataMessage; + Assert.Equal("groupname", groupDataMessage.Group); + Assert.Equal(738476327894u, groupDataMessage.SequenceId); + Assert.Equal(WebPubSubDataType.Text, groupDataMessage.DataType); + Assert.Equal("user", groupDataMessage.FromUserId); + Assert.Equal("xyz", groupDataMessage.Data.ToString()); + }); + yield return GetData(new { type = "message", from = "group", group = "groupname", dataType = "json", data = new JsonData { Value = "xyz" } }, message => + { + Assert.True(message is GroupDataMessage); + var groupDataMessage = message as GroupDataMessage; + Assert.Equal("groupname", groupDataMessage.Group); + Assert.Null(groupDataMessage.SequenceId); + Assert.Equal(WebPubSubDataType.Json, groupDataMessage.DataType); + var obj = groupDataMessage.Data.ToObjectFromJson(); + Assert.Equal("xyz", obj.Value); + }); + yield return GetData(new { type = "message", from = "group", group = "groupname", dataType = "binary", data = "eHl6" }, message => + { + Assert.True(message is GroupDataMessage); + var groupDataMessage = message as GroupDataMessage; + Assert.Equal("groupname", groupDataMessage.Group); + Assert.Null(groupDataMessage.SequenceId); + Assert.Equal(WebPubSubDataType.Binary, groupDataMessage.DataType); + Assert.Equal("eHl6", Convert.ToBase64String(groupDataMessage.Data.ToArray())); + }); + yield return GetData(new { sequenceId = 738476327894u, type = "message", from = "server", dataType = "text", data = "xyz" }, message => + { + Assert.True(message is ServerDataMessage); + var dataMessage = message as ServerDataMessage; + Assert.Equal(738476327894u, dataMessage.SequenceId); + Assert.Equal(WebPubSubDataType.Text, dataMessage.DataType); + Assert.Equal("xyz", dataMessage.Data.ToString()); + }); + yield return GetData(new { type = "message", from = "server", dataType = "json", data = new JsonData { Value = "xyz" } }, message => + { + Assert.True(message is ServerDataMessage); + var dataMessage = message as ServerDataMessage;; + Assert.Null(dataMessage.SequenceId); + Assert.Equal(WebPubSubDataType.Json, dataMessage.DataType); + var obj = dataMessage.Data.ToObjectFromJson(); + Assert.Equal("xyz", obj.Value); + }); + yield return GetData(new { type = "message", from = "server", dataType = "binary", data = "eHl6" }, message => + { + Assert.True(message is ServerDataMessage); + var dataMessage = message as ServerDataMessage; + Assert.Null(dataMessage.SequenceId); + Assert.Equal(WebPubSubDataType.Binary, dataMessage.DataType); + Assert.Equal("eHl6", Convert.ToBase64String(dataMessage.Data.ToArray())); + }); + yield return GetData(new { type = "system", @event = "connected", userId = "user", connectionId = "connection" }, message => + { + Assert.True(message is ConnectedMessage); + var connectedMessage = message as ConnectedMessage; + Assert.Equal("user", connectedMessage.UserId); + Assert.Equal("connection", connectedMessage.ConnectionId); + Assert.Null(connectedMessage.ReconnectionToken); + }); + yield return GetData(new { type = "system", @event = "connected", userId = "user", connectionId = "connection", reconnectionToken = "rec" }, message => + { + Assert.True(message is ConnectedMessage); + var connectedMessage = message as ConnectedMessage; + Assert.Equal("user", connectedMessage.UserId); + Assert.Equal("connection", connectedMessage.ConnectionId); + Assert.Equal("rec", connectedMessage.ReconnectionToken); + }); + yield return GetData(new { type = "system", @event = "disconnected", message = "msg" }, message => + { + Assert.True(message is DisconnectedMessage); + var disconnectedMessage = message as DisconnectedMessage; + Assert.Equal("msg", disconnectedMessage.Reason); + }); + } + + public static IEnumerable GetSerializingTestData() + { + static object[] GetData(WebPubSubMessage message, object json) + { + return new object[] { message, JsonSerializer.Serialize(json)}; + } + + yield return GetData(new JoinGroupMessage("group", null), new { type = "joinGroup", group = "group" }); + yield return GetData(new JoinGroupMessage("group", 738476327894u), new { type = "joinGroup", group = "group", ackId = 738476327894u }); + yield return GetData(new LeaveGroupMessage("group", null), new { type = "leaveGroup", group = "group" }); + yield return GetData(new LeaveGroupMessage("group", 738476327894u), new { type = "leaveGroup", group = "group", ackId = 738476327894u }); + yield return GetData(new SendToGroupMessage("group", BinaryData.FromString("xzy"), WebPubSubDataType.Text, null, false), new { type = "sendToGroup", group = "group", noEcho = false, dataType = "Text", data = "xzy" }); + yield return GetData(new SendToGroupMessage("group", BinaryData.FromObjectAsJson(new JsonData { Value = "xyz"}), WebPubSubDataType.Json, 738476327894u, true), new { type = "sendToGroup", group = "group", ackId = 738476327894u, noEcho = true, dataType = "Json", data = new { Value = "xyz" } }); + yield return GetData(new SendToGroupMessage("group", BinaryData.FromBytes(Convert.FromBase64String("eHl6")), WebPubSubDataType.Binary, 738476327894u, true), new { type = "sendToGroup", group = "group", ackId = 738476327894u, noEcho = true, dataType = "Binary", data = "eHl6" }); + yield return GetData(new SendToGroupMessage("group", BinaryData.FromBytes(Convert.FromBase64String("eHl6")), WebPubSubDataType.Protobuf, 738476327894u, true), new { type = "sendToGroup", group = "group", ackId = 738476327894u, noEcho = true, dataType = "Protobuf", data = "eHl6" }); + yield return GetData(new SendEventMessage("event", BinaryData.FromString("xzy"), WebPubSubDataType.Text, null), new { type = "event", @event = "event", dataType = "Text", data = "xzy" }); + yield return GetData(new SendEventMessage("event", BinaryData.FromObjectAsJson(new JsonData { Value = "xyz" }), WebPubSubDataType.Json, 738476327894u), new { type = "event", @event = "event", ackId = 738476327894u,dataType = "Json", data = new { Value = "xyz" } }); + yield return GetData(new SendEventMessage("event", BinaryData.FromBytes(Convert.FromBase64String("eHl6")), WebPubSubDataType.Binary, 738476327894u), new { type = "event", @event = "event", ackId = 738476327894u, dataType = "Binary", data = "eHl6" }); + yield return GetData(new SendEventMessage("event", BinaryData.FromBytes(Convert.FromBase64String("eHl6")), WebPubSubDataType.Protobuf, 738476327894u), new { type = "event", @event = "event", ackId = 738476327894u, dataType = "Protobuf", data = "eHl6" }); + yield return GetData(new SequenceAckMessage(123), new { type = "sequenceAck", sequenceId = 123 }); + yield return GetData(new SequenceAckMessage(738476327894u), new { type = "sequenceAck", sequenceId = 738476327894u }); + } + + [MemberData(nameof(GetParsingTestData))] + [Theory] + public void ParseMessageTest(byte[] payload, Action messageAssert) + { + var protocol = new WebPubSubJsonProtocol(); + var resolvedMessage = protocol.ParseMessage(new ReadOnlySequence(payload)); + messageAssert(resolvedMessage); + } + + [MemberData(nameof(GetSerializingTestData))] + [Theory] + public void SerializeMessageTest(WebPubSubMessage message, string serializedPayload) + { + var protocol = new WebPubSubJsonProtocol(); + Assert.Equal(serializedPayload, Encoding.UTF8.GetString(protocol.GetMessageBytes(message).ToArray())); + } + + [Fact] + public void ProtocolPropertyTest() + { + var jsonProtocol = new WebPubSubJsonProtocol(); + Assert.False(jsonProtocol.IsReliable); + Assert.Equal("json.webpubsub.azure.v1", jsonProtocol.Name); + + var jsonReliableProtocol = new WebPubSubJsonReliableProtocol(); + Assert.True(jsonReliableProtocol.IsReliable); + Assert.Equal("json.reliable.webpubsub.azure.v1", jsonReliableProtocol.Name); + } + } +} +#endif