Skip to content

Commit

Permalink
Use MessagePackPrimitives
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Dec 6, 2024
1 parent 0ce1497 commit 38ce4b1
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,69 @@ namespace MagicOnion.Internal
internal ref struct StreamingHubClientMessageReader
{
readonly ReadOnlyMemory<byte> data;
MessagePackReader reader;
int position;

public StreamingHubClientMessageReader(ReadOnlyMemory<byte> data)
{
this.data = data;
this.reader = new MessagePackReader(data);
this.position = 0;
}
void VerifyResultAndAdvance(MessagePackPrimitives.DecodeResult result, int readLen)
{
if (result != MessagePackPrimitives.DecodeResult.Success)
{
throw new InvalidOperationException($"Invalid message format: {result}");
}
position += readLen;
}

public StreamingHubMessageType ReadMessageType()
{
var arrayLength = this.reader.ReadArrayHeader();
VerifyResultAndAdvance(MessagePackPrimitives.TryReadArrayHeader(data.Span.Slice(position), out var arrayLength, out var read), read);
return arrayLength switch
{
2 => StreamingHubMessageType.Broadcast,
3 => StreamingHubMessageType.Response,
4 => StreamingHubMessageType.ResponseWithError,
5 => reader.ReadByte() switch
{
0x00 /* 0:ClientResultRequest */ => StreamingHubMessageType.ClientResultRequest,
0x7e /* 126:ClientHeartbeatResponse */ => StreamingHubMessageType.ClientHeartbeatResponse,
0x7f /* 127:ServerHeartbeat */ => StreamingHubMessageType.ServerHeartbeat,
var x => throw new InvalidOperationException($"Unknown Type: {x}"),
},
5 => ReadMessageSubType(),
_ => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"),
};
}

StreamingHubMessageType ReadMessageSubType()
{
VerifyResultAndAdvance(MessagePackPrimitives.TryReadByte(data.Span.Slice(position), out var subType, out var read), read);
return subType switch
{
0x00 /* 0:ClientResultRequest */ => StreamingHubMessageType.ClientResultRequest,
0x7e /* 126:ClientHeartbeatResponse */ => StreamingHubMessageType.ClientHeartbeatResponse,
0x7f /* 127:ServerHeartbeat */ => StreamingHubMessageType.ServerHeartbeat,
_ => throw new InvalidOperationException($"Unknown Type: {subType}"),
};
}

public (int MethodId, int Cosumed) ReadBroadcastMessageMethodId()
{
return (reader.ReadInt32(), (int)reader.Consumed);
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt32(data.Span.Slice(position), out var methodId, out var readMethodId), readMethodId);
return (methodId, position);
}

public (int MethodId, ReadOnlyMemory<byte> Body) ReadBroadcastMessage()
{
var methodId = reader.ReadInt32();
var offset = (int)reader.Consumed;
return (methodId, data.Slice(offset));
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt32(data.Span.Slice(position), out var methodId, out var readMethodId), readMethodId);
return (methodId, data.Slice(position));
}

public (int MessageId, int MethodId, ReadOnlyMemory<byte> Body) ReadResponseMessage()
{
var messageId = reader.ReadInt32();
var methodId = reader.ReadInt32();
var offset = (int)reader.Consumed;
return (messageId, methodId, data.Slice(offset));
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt32(data.Span.Slice(position), out var messageId, out var readMessageId), readMessageId);
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt32(data.Span.Slice(position), out var methodId, out var readMethodId), readMethodId);
return (messageId, methodId, data.Slice(position));
}

public (int MessageId, int StatusCode, string? Detail, string? Error) ReadResponseWithErrorMessage()
{
var reader = new MessagePackReader(data.Slice(position));
var messageId = reader.ReadInt32();
var statusCode = reader.ReadInt32();
var detail = reader.ReadString();
Expand All @@ -65,31 +79,40 @@ public StreamingHubMessageType ReadMessageType()

public (Guid ClientResultRequestMessageId, int MethodId, ReadOnlyMemory<byte> Body) ReadClientResultRequestMessage()
{
var reader = new MessagePackReader(data.Slice(position));
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
reader.Skip(); // Dummy
var clientRequestMessageId = MessagePackSerializer.Deserialize<Guid>(ref reader);
var methodId = reader.ReadInt32();
var offset = (int)reader.Consumed;
position += (int)reader.Consumed;

return (clientRequestMessageId, methodId, data.Slice(offset));
return (clientRequestMessageId, methodId, data.Slice(position));
}

public (short Sequence, long ServerSentAt, ReadOnlyMemory<byte> Metadata) ReadServerHeartbeat()
{
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
var sequence = reader.ReadInt16(); // Sequence
var serverSentAt = reader.ReadInt64(); // ServerSentAt (2)
reader.Skip(); // Dummy (3)
// Type is already read by ReadMessageType (Byte)

return (sequence, serverSentAt, data.Slice((int)reader.Consumed));
// Sequence (1)
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt16(data.Span.Slice(position), out var sequence, out var readLenSequence), readLenSequence);
// ServerSentAt (2)
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt64(data.Span.Slice(position), out var serverSentAt, out var readLenServerSentAt), readLenServerSentAt);
// Reserved (3)
VerifyResultAndAdvance(MessagePackPrimitives.TryReadNil(data.Span.Slice(position), out var readLenReserved), readLenReserved);

return (sequence, serverSentAt, data.Slice(position));
}

public (short Sequence, long ClientSentAt) ReadClientHeartbeatResponse()
{
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
var sequence = reader.ReadInt16(); // Sequence
var clientSentAt = reader.ReadInt64(); // ClientSentAt (2)
reader.Skip(); // Reserved (3)
// Type is already read by ReadMessageType (Byte)

// Sequence (1)
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt16(data.Span.Slice(position), out var sequence, out var readLenSequence), readLenSequence);
// ClientSentAt (2)
VerifyResultAndAdvance(MessagePackPrimitives.TryReadInt64(data.Span.Slice(position), out var clientSentAt, out var readLenClientSentAt), readLenClientSentAt);
// Reserved (3)
VerifyResultAndAdvance(MessagePackPrimitives.TryReadNil(data.Span.Slice(position), out var readLenReserved), readLenReserved);

return (sequence, clientSentAt);
}
Expand Down
Loading

0 comments on commit 38ce4b1

Please sign in to comment.