diff --git a/http/vibe/http/websockets.d b/http/vibe/http/websockets.d index 0e69fa16d8..5007cb4e79 100644 --- a/http/vibe/http/websockets.d +++ b/http/vibe/http/websockets.d @@ -648,7 +648,6 @@ final class WebSocket { */ final class OutgoingWebSocketMessage : OutputStream { @safe: - private { RandomNumberStream m_rng; Stream m_conn; @@ -657,7 +656,7 @@ final class OutgoingWebSocketMessage : OutputStream { bool m_finalized = false; } - this( Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng ) + this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) { assert(conn !is null); m_conn = conn; @@ -668,6 +667,12 @@ final class OutgoingWebSocketMessage : OutputStream { size_t write(in ubyte[] bytes, IOMode mode) { assert(!m_finalized); + + if (!m_buffer.data.length) { + ubyte[Frame.maxHeaderSize] header_padding; + m_buffer.put(header_padding[]); + } + m_buffer.put(bytes); return bytes.length; } @@ -675,27 +680,34 @@ final class OutgoingWebSocketMessage : OutputStream { void flush() { assert(!m_finalized); - Frame frame; - frame.opcode = m_frameOpcode; - frame.fin = false; - frame.payload = m_buffer.data; - frame.writeFrame(m_conn, m_rng); - m_buffer.clear(); - m_conn.flush(); + if (m_buffer.data.length > 0) + sendFrame(false); } void finalize() { if (m_finalized) return; m_finalized = true; + sendFrame(true); + } + + private void sendFrame(bool fin) + { + if (!m_buffer.data.length) + write(null, IOMode.once); + + assert(m_buffer.data.length >= Frame.maxHeaderSize); Frame frame; - frame.fin = true; + frame.fin = fin; frame.opcode = m_frameOpcode; - frame.payload = m_buffer.data; - frame.writeFrame(m_conn, m_rng); - m_buffer.clear(); + frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; + auto hsize = frame.getHeaderSize(m_rng !is null); + auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; + frame.writeHeader(msg[0 .. hsize], m_rng); + m_conn.write(msg); m_conn.flush(); + m_buffer.clear(); } alias write = OutputStream.write; @@ -707,7 +719,6 @@ final class OutgoingWebSocketMessage : OutputStream { */ final class IncomingWebSocketMessage : InputStream { @safe: - private { RandomNumberStream m_rng; Stream m_conn; @@ -797,7 +808,7 @@ private immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; * Currently only 6 values are defined, however the opcode is defined as * taking 4 bytes. */ -enum FrameOpcode : uint { +private enum FrameOpcode : uint { continuation = 0x0, text = 0x1, binary = 0x2, @@ -807,51 +818,62 @@ enum FrameOpcode : uint { } -struct Frame { +private struct Frame { @safe: + enum maxHeaderSize = 14; bool fin; FrameOpcode opcode; ubyte[] payload; - void writeFrame(OutputStream stream, RandomNumberStream sys_rng) + size_t getHeaderSize(bool mask) { - import vibe.stream.wrapper; - - auto rng = StreamOutputRange(stream); + size_t ret = 1; + if (payload.length < 126) ret += 1; + else if (payload.length < 65536) ret += 3; + else ret += 9; + if (mask) ret += 4; + return ret; + } + void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) + { ubyte[4] buff; ubyte firstByte = cast(ubyte)opcode; if (fin) firstByte |= 0x80; - rng.put(firstByte); - - auto b1 = 0; - if (sys_rng) { - b1 = 0x80; - } - - if( payload.length < 126 ) { - rng.put(std.bitmanip.nativeToBigEndian(cast(ubyte)(b1 | payload.length))); - } else if( payload.length <= 65536 ) { - buff[0] = cast(ubyte) (b1 | 126); - rng.put(buff[0 .. 1]); - rng.put(std.bitmanip.nativeToBigEndian(cast(ushort)payload.length)); + dst[0] = firstByte; + dst = dst[1 .. $]; + + auto b1 = sys_rng ? 0x80 : 0x00; + + if (payload.length < 126) { + dst[0] = cast(ubyte)(b1 | payload.length); + dst = dst[1 .. $]; + } else if (payload.length < 65536) { + dst[0] = cast(ubyte) (b1 | 126); + dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); + dst = dst[3 .. $]; } else { - buff[0] = cast(ubyte) (b1 | 127); - rng.put(buff[0 .. 1]); - rng.put(std.bitmanip.nativeToBigEndian(payload.length)); + dst[0] = cast(ubyte) (b1 | 127); + dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); + dst = dst[9 .. $]; } if (sys_rng) { - sys_rng.read(buff); - rng.put(buff); - for (size_t i = 0; i < payload.length; i++) { - payload[i] ^= buff[i % 4]; - } - rng.put(payload); - }else { - rng.put(payload); + sys_rng.read(dst[0 .. 4]); + for (size_t i = 0; i < payload.length; i++) + payload[i] ^= dst[i % 4]; } + } + + void writeFrame(OutputStream stream, RandomNumberStream sys_rng) + { + import vibe.stream.wrapper; + + auto rng = StreamOutputRange(stream); + ubyte[maxHeaderSize] hdr; + writeHeader(hdr[], sys_rng); + rng.put(hdr[0 .. getHeaderSize(sys_rng !is null)]); rng.flush(); stream.flush(); } @@ -896,6 +918,73 @@ struct Frame { } } +unittest { + import std.algorithm.searching : all; + + final class DummyRNG : RandomNumberStream { + @safe: + @property bool empty() { return false; } + @property ulong leastSize() { return ulong.max; } + @property bool dataAvailableForRead() { return true; } + const(ubyte)[] peek() { return null; } + size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } + alias read = RandomNumberStream.read; + } + + ubyte[14] hdrbuf; + auto rng = new DummyRNG; + + Frame f; + f.payload = new ubyte[125]; + + assert(f.getHeaderSize(false) == 2); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 2], null); + assert(hdrbuf[0 .. 2] == [0, 125]); + + assert(f.getHeaderSize(true) == 6); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 6], rng); + assert(hdrbuf[0 .. 2] == [0, 128|125]); + assert(hdrbuf[2 .. 6].all!(b => b == 13)); + + f.payload = new ubyte[126]; + assert(f.getHeaderSize(false) == 4); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 4], null); + assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); + + assert(f.getHeaderSize(true) == 8); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 8], rng); + assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); + assert(hdrbuf[4 .. 8].all!(b => b == 13)); + + f.payload = new ubyte[65535]; + assert(f.getHeaderSize(false) == 4); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 4], null); + assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); + + assert(f.getHeaderSize(true) == 8); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 8], rng); + assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); + assert(hdrbuf[4 .. 8].all!(b => b == 13)); + + f.payload = new ubyte[65536]; + assert(f.getHeaderSize(false) == 10); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 10], null); + assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); + + assert(f.getHeaderSize(true) == 14); + hdrbuf[] = 0; + f.writeHeader(hdrbuf[0 .. 14], rng); + assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); + assert(hdrbuf[10 .. 14].all!(b => b == 13)); +} + /** * Generate a challenge key for the protocol upgrade phase. */