Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assemble frames in OutgoingWebSocketMessage in a single buffer. #1792

Merged
merged 2 commits into from
Jul 12, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 133 additions & 44 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,6 @@ final class WebSocket {
*/
final class OutgoingWebSocketMessage : OutputStream {
@safe:

private {
RandomNumberStream m_rng;
Stream m_conn;
Expand All @@ -657,7 +656,7 @@ final class OutgoingWebSocketMessage : OutputStream {
bool m_finalized = false;
}

this( Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng )
this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
Expand All @@ -668,34 +667,47 @@ final class OutgoingWebSocketMessage : OutputStream {
size_t write(in ubyte[] bytes, IOMode mode)
{
assert(!m_finalized);

if (!m_buffer.data.length) {
ubyte[Frame.maxHeaderSize] header_padding;
m_buffer.put(header_padding[]);
}

m_buffer.put(bytes);
return bytes.length;
}

void flush()
{
assert(!m_finalized);
Frame frame;
frame.opcode = m_frameOpcode;
frame.fin = false;
frame.payload = m_buffer.data;
frame.writeFrame(m_conn, m_rng);
m_buffer.clear();
m_conn.flush();
if (m_buffer.data.length > 0)
sendFrame(false);
}

void finalize()
{
if (m_finalized) return;
m_finalized = true;
sendFrame(true);
}

private void sendFrame(bool fin)
{
if (!m_buffer.data.length)
write(null, IOMode.once);

assert(m_buffer.data.length >= Frame.maxHeaderSize);

Frame frame;
frame.fin = true;
frame.fin = fin;
frame.opcode = m_frameOpcode;
frame.payload = m_buffer.data;
frame.writeFrame(m_conn, m_rng);
m_buffer.clear();
frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
auto hsize = frame.getHeaderSize(m_rng !is null);
auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
frame.writeHeader(msg[0 .. hsize], m_rng);
m_conn.write(msg);
m_conn.flush();
m_buffer.clear();
}

alias write = OutputStream.write;
Expand All @@ -707,7 +719,6 @@ final class OutgoingWebSocketMessage : OutputStream {
*/
final class IncomingWebSocketMessage : InputStream {
@safe:

private {
RandomNumberStream m_rng;
Stream m_conn;
Expand Down Expand Up @@ -797,7 +808,7 @@ private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
* Currently only 6 values are defined, however the opcode is defined as
* taking 4 bytes.
*/
enum FrameOpcode : uint {
private enum FrameOpcode : uint {
continuation = 0x0,
text = 0x1,
binary = 0x2,
Expand All @@ -807,51 +818,62 @@ enum FrameOpcode : uint {
}


struct Frame {
private struct Frame {
@safe:
enum maxHeaderSize = 14;

bool fin;
FrameOpcode opcode;
ubyte[] payload;

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
size_t getHeaderSize(bool mask)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
size_t ret = 1;
if (payload.length < 126) ret += 1;
else if (payload.length < 65536) ret += 3;
else ret += 9;
if (mask) ret += 4;
return ret;
}

void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
{
ubyte[4] buff;
ubyte firstByte = cast(ubyte)opcode;
if (fin) firstByte |= 0x80;
rng.put(firstByte);

auto b1 = 0;
if (sys_rng) {
b1 = 0x80;
}

if( payload.length < 126 ) {
rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)(b1 | payload.length)));
} else if( payload.length <= 65536 ) {
buff[0] = cast(ubyte) (b1 | 126);
rng.put(buff[0 .. 1]);
rng.put(std.bitmanip.nativeToBigEndian(cast(ushort)payload.length));
dst[0] = firstByte;
dst = dst[1 .. $];

auto b1 = sys_rng ? 0x80 : 0x00;

if (payload.length < 126) {
dst[0] = cast(ubyte)(b1 | payload.length);
dst = dst[1 .. $];
} else if (payload.length < 65536) {
dst[0] = cast(ubyte) (b1 | 126);
dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
dst = dst[3 .. $];
} else {
buff[0] = cast(ubyte) (b1 | 127);
rng.put(buff[0 .. 1]);
rng.put(std.bitmanip.nativeToBigEndian(payload.length));
dst[0] = cast(ubyte) (b1 | 127);
dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
dst = dst[9 .. $];
}

if (sys_rng) {
sys_rng.read(buff);
rng.put(buff);
for (size_t i = 0; i < payload.length; i++) {
payload[i] ^= buff[i % 4];
}
rng.put(payload);
}else {
rng.put(payload);
sys_rng.read(dst[0 .. 4]);
for (size_t i = 0; i < payload.length; i++)
payload[i] ^= dst[i % 4];
}
}

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
ubyte[maxHeaderSize] hdr;
writeHeader(hdr[], sys_rng);
rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]);
rng.flush();
stream.flush();
}
Expand Down Expand Up @@ -896,6 +918,73 @@ struct Frame {
}
}

unittest {
import std.algorithm.searching : all;

final class DummyRNG : RandomNumberStream {
@safe:
@property bool empty() { return false; }
@property ulong leastSize() { return ulong.max; }
@property bool dataAvailableForRead() { return true; }
const(ubyte)[] peek() { return null; }
size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
alias read = RandomNumberStream.read;
}

ubyte[14] hdrbuf;
auto rng = new DummyRNG;

Frame f;
f.payload = new ubyte[125];

assert(f.getHeaderSize(false) == 2);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 2], null);
assert(hdrbuf[0 .. 2] == [0, 125]);

assert(f.getHeaderSize(true) == 6);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 6], rng);
assert(hdrbuf[0 .. 2] == [0, 128|125]);
assert(hdrbuf[2 .. 6].all!(b => b == 13));

f.payload = new ubyte[126];
assert(f.getHeaderSize(false) == 4);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 4], null);
assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);

assert(f.getHeaderSize(true) == 8);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 8], rng);
assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
assert(hdrbuf[4 .. 8].all!(b => b == 13));

f.payload = new ubyte[65535];
assert(f.getHeaderSize(false) == 4);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 4], null);
assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);

assert(f.getHeaderSize(true) == 8);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 8], rng);
assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
assert(hdrbuf[4 .. 8].all!(b => b == 13));

f.payload = new ubyte[65536];
assert(f.getHeaderSize(false) == 10);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 10], null);
assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);

assert(f.getHeaderSize(true) == 14);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 14], rng);
assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
assert(hdrbuf[10 .. 14].all!(b => b == 13));
}

/**
* Generate a challenge key for the protocol upgrade phase.
*/
Expand Down