Skip to content

Commit

Permalink
Simplify partial reads
Browse files Browse the repository at this point in the history
  • Loading branch information
quinchs committed Oct 18, 2022
1 parent 4fcb6ec commit 945c2ed
Showing 1 changed file with 21 additions and 40 deletions.
61 changes: 21 additions & 40 deletions src/EdgeDB.Net.Driver/Binary/ClientPacketDuplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,16 @@ private CancellationToken GetTimeoutToken()

private async Task ReadAsync()
{
byte[] packetHeaderBuffer = new byte[PACKET_HEADER_SIZE];

Memory<byte> packetHeaderBuffer = new byte[PACKET_HEADER_SIZE];
try
{
while (!_disconnectTokenSource.IsCancellationRequested)
{
if (_stream == null)
return;

var result = await _stream.ReadAsync(packetHeaderBuffer, _disconnectTokenSource.Token).ConfigureAwait(false);

if (result == 0 || (result != 5 && ReadPartialHeader(ref packetHeaderBuffer, result) != 5))

if (await ReadExactAsync(packetHeaderBuffer, _disconnectTokenSource.Token).ConfigureAwait(false) == 0)
{
// disconnected
_disconnectTokenSource?.Cancel();
Expand All @@ -100,22 +98,13 @@ private async Task ReadAsync()
}

// read the length
var type = (ServerMessageType)packetHeaderBuffer[0];
var length = (int)BinaryPrimitives.ReadUInt32BigEndian(packetHeaderBuffer.AsSpan()[1..5]) - 4;
var type = (ServerMessageType)packetHeaderBuffer.Span[0];
var length = (int)BinaryPrimitives.ReadUInt32BigEndian(packetHeaderBuffer.Span[1..5]) - 4;

using var memoryOwner = MemoryPool<byte>.Shared.Rent(length);
var buffer = memoryOwner.Memory[..length];

int read = await _stream.ReadAsync(buffer, _disconnectTokenSource.Token).ConfigureAwait(false);

if (read != length && read != 0)
{
var handle = buffer.Pin();
read = ReadIntoHandle(length, read, ref handle);
handle.Dispose();
}

if (read == 0)
if (await ReadExactAsync(buffer, _disconnectTokenSource.Token).ConfigureAwait(false) == 0)
{
// disconnected
_disconnectTokenSource?.Cancel();
Expand Down Expand Up @@ -145,35 +134,27 @@ private async Task ReadAsync()
}
}

private unsafe int ReadPartialHeader(ref byte[] header, int totalRead)
private async ValueTask<int> ReadExactAsync(Memory<byte> buffer, CancellationToken token)
{
if (_stream == null)
return 0;
var targetLength = buffer.Length;

int read = totalRead;
int numZeros = 0;
int numRead = 0;

fixed(byte* pointer = header)
while(numRead < targetLength)
{
while (read < PACKET_HEADER_SIZE)
{
var span = new Span<byte>(pointer + read, PACKET_HEADER_SIZE - read);
var buff = numRead == 0
? buffer
: buffer[numRead..(targetLength - numRead)];

var result = _stream.Read(span);
if (result == 0)
{
numZeros++;
var read = await _stream!.ReadAsync(buff, token);

if (numZeros >= 20)
{
_disconnectTokenSource.Cancel();
return 0;
}
}
read += result;
}
if (read == 0) // disconnected
return 0;

numRead += read;
}
return read;

return numRead;
}

private unsafe int ReadIntoHandle(int length, int offset, ref MemoryHandle handle)
Expand Down

0 comments on commit 945c2ed

Please sign in to comment.