Skip to content

Commit

Permalink
Merge pull request #1792 from rejectedsoftware/ws_frame_opt
Browse files Browse the repository at this point in the history
Assemble frames in OutgoingWebSocketMessage in a single buffer.
merged-on-behalf-of: Sönke Ludwig <[email protected]>
  • Loading branch information
dlang-bot authored Jul 12, 2017
2 parents 0980d1f + 64fb3a7 commit e317840
Showing 1 changed file with 133 additions and 44 deletions.
177 changes: 133 additions & 44 deletions http/vibe/http/websockets.d
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,6 @@ final class WebSocket {
*/
final class OutgoingWebSocketMessage : OutputStream {
@safe:

private {
RandomNumberStream m_rng;
Stream m_conn;
Expand All @@ -657,7 +656,7 @@ final class OutgoingWebSocketMessage : OutputStream {
bool m_finalized = false;
}

this( Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng )
this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
{
assert(conn !is null);
m_conn = conn;
Expand All @@ -668,34 +667,47 @@ final class OutgoingWebSocketMessage : OutputStream {
size_t write(in ubyte[] bytes, IOMode mode)
{
assert(!m_finalized);

if (!m_buffer.data.length) {
ubyte[Frame.maxHeaderSize] header_padding;
m_buffer.put(header_padding[]);
}

m_buffer.put(bytes);
return bytes.length;
}

void flush()
{
assert(!m_finalized);
Frame frame;
frame.opcode = m_frameOpcode;
frame.fin = false;
frame.payload = m_buffer.data;
frame.writeFrame(m_conn, m_rng);
m_buffer.clear();
m_conn.flush();
if (m_buffer.data.length > 0)
sendFrame(false);
}

void finalize()
{
if (m_finalized) return;
m_finalized = true;
sendFrame(true);
}

private void sendFrame(bool fin)
{
if (!m_buffer.data.length)
write(null, IOMode.once);

assert(m_buffer.data.length >= Frame.maxHeaderSize);

Frame frame;
frame.fin = true;
frame.fin = fin;
frame.opcode = m_frameOpcode;
frame.payload = m_buffer.data;
frame.writeFrame(m_conn, m_rng);
m_buffer.clear();
frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
auto hsize = frame.getHeaderSize(m_rng !is null);
auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
frame.writeHeader(msg[0 .. hsize], m_rng);
m_conn.write(msg);
m_conn.flush();
m_buffer.clear();
}

alias write = OutputStream.write;
Expand All @@ -707,7 +719,6 @@ final class OutgoingWebSocketMessage : OutputStream {
*/
final class IncomingWebSocketMessage : InputStream {
@safe:

private {
RandomNumberStream m_rng;
Stream m_conn;
Expand Down Expand Up @@ -797,7 +808,7 @@ private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
* Currently only 6 values are defined, however the opcode is defined as
* taking 4 bytes.
*/
enum FrameOpcode : uint {
private enum FrameOpcode : uint {
continuation = 0x0,
text = 0x1,
binary = 0x2,
Expand All @@ -807,51 +818,62 @@ enum FrameOpcode : uint {
}


struct Frame {
private struct Frame {
@safe:
enum maxHeaderSize = 14;

bool fin;
FrameOpcode opcode;
ubyte[] payload;

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
size_t getHeaderSize(bool mask)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
size_t ret = 1;
if (payload.length < 126) ret += 1;
else if (payload.length < 65536) ret += 3;
else ret += 9;
if (mask) ret += 4;
return ret;
}

void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
{
ubyte[4] buff;
ubyte firstByte = cast(ubyte)opcode;
if (fin) firstByte |= 0x80;
rng.put(firstByte);

auto b1 = 0;
if (sys_rng) {
b1 = 0x80;
}

if( payload.length < 126 ) {
rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)(b1 | payload.length)));
} else if( payload.length <= 65536 ) {
buff[0] = cast(ubyte) (b1 | 126);
rng.put(buff[0 .. 1]);
rng.put(std.bitmanip.nativeToBigEndian(cast(ushort)payload.length));
dst[0] = firstByte;
dst = dst[1 .. $];

auto b1 = sys_rng ? 0x80 : 0x00;

if (payload.length < 126) {
dst[0] = cast(ubyte)(b1 | payload.length);
dst = dst[1 .. $];
} else if (payload.length < 65536) {
dst[0] = cast(ubyte) (b1 | 126);
dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
dst = dst[3 .. $];
} else {
buff[0] = cast(ubyte) (b1 | 127);
rng.put(buff[0 .. 1]);
rng.put(std.bitmanip.nativeToBigEndian(payload.length));
dst[0] = cast(ubyte) (b1 | 127);
dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
dst = dst[9 .. $];
}

if (sys_rng) {
sys_rng.read(buff);
rng.put(buff);
for (size_t i = 0; i < payload.length; i++) {
payload[i] ^= buff[i % 4];
}
rng.put(payload);
}else {
rng.put(payload);
sys_rng.read(dst[0 .. 4]);
for (size_t i = 0; i < payload.length; i++)
payload[i] ^= dst[i % 4];
}
}

void writeFrame(OutputStream stream, RandomNumberStream sys_rng)
{
import vibe.stream.wrapper;

auto rng = StreamOutputRange(stream);
ubyte[maxHeaderSize] hdr;
writeHeader(hdr[], sys_rng);
rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]);
rng.flush();
stream.flush();
}
Expand Down Expand Up @@ -896,6 +918,73 @@ struct Frame {
}
}

unittest {
import std.algorithm.searching : all;

final class DummyRNG : RandomNumberStream {
@safe:
@property bool empty() { return false; }
@property ulong leastSize() { return ulong.max; }
@property bool dataAvailableForRead() { return true; }
const(ubyte)[] peek() { return null; }
size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
alias read = RandomNumberStream.read;
}

ubyte[14] hdrbuf;
auto rng = new DummyRNG;

Frame f;
f.payload = new ubyte[125];

assert(f.getHeaderSize(false) == 2);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 2], null);
assert(hdrbuf[0 .. 2] == [0, 125]);

assert(f.getHeaderSize(true) == 6);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 6], rng);
assert(hdrbuf[0 .. 2] == [0, 128|125]);
assert(hdrbuf[2 .. 6].all!(b => b == 13));

f.payload = new ubyte[126];
assert(f.getHeaderSize(false) == 4);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 4], null);
assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);

assert(f.getHeaderSize(true) == 8);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 8], rng);
assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
assert(hdrbuf[4 .. 8].all!(b => b == 13));

f.payload = new ubyte[65535];
assert(f.getHeaderSize(false) == 4);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 4], null);
assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);

assert(f.getHeaderSize(true) == 8);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 8], rng);
assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
assert(hdrbuf[4 .. 8].all!(b => b == 13));

f.payload = new ubyte[65536];
assert(f.getHeaderSize(false) == 10);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 10], null);
assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);

assert(f.getHeaderSize(true) == 14);
hdrbuf[] = 0;
f.writeHeader(hdrbuf[0 .. 14], rng);
assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
assert(hdrbuf[10 .. 14].all!(b => b == 13));
}

/**
* Generate a challenge key for the protocol upgrade phase.
*/
Expand Down

0 comments on commit e317840

Please sign in to comment.