From 269d995b7e994e568f80d572b2b9fc1aaa69dac6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Kjartan=20Yasin?= Date: Mon, 11 Jan 2016 13:52:27 -0800 Subject: [PATCH 1/3] Make Erlang 18 compliant. --- src/http2_connection.erl | 2 +- test/starting_SUITE.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/http2_connection.erl b/src/http2_connection.erl index 2e4471d1..00eec0ec 100644 --- a/src/http2_connection.erl +++ b/src/http2_connection.erl @@ -601,7 +601,7 @@ handle_info({inet_async, _ListSock, Ref, {ok, CliSocket}}, ssl -> {ok, AcceptSocket} = ssl:ssl_accept(CliSocket, SSLOptions), %% TODO: Erlang 18 uses ALPN - {ok, _Upgrayedd} = ssl:negotiated_next_protocol(AcceptSocket), + {ok, _Upgrayedd} = ssl:negotiated_protocol(AcceptSocket), AcceptSocket end, chatterbox_sup:start_socket(), diff --git a/test/starting_SUITE.erl b/test/starting_SUITE.erl index 2f49dece..dd2fdad9 100644 --- a/test/starting_SUITE.erl +++ b/test/starting_SUITE.erl @@ -23,7 +23,7 @@ identifies_protocol(Config) -> {ok, Socket} = ssl:connect("localhost", Port, Options), ct:pal("Socket to me: ~p", [Socket]), - try ssl:negotiated_next_protocol(Socket) of + try ssl:negotiated_protocol(Socket) of {ok, NextProtocol} -> ct:pal("NextProtocol: ~p", [NextProtocol]), <<"h2">> = NextProtocol, From e414bce0ae67da7df5013ad8416bfdd172a76ca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Kjartan=20Yasin?= Date: Mon, 11 Jan 2016 15:53:06 -0800 Subject: [PATCH 2/3] Make http2c configurable. --- src/http2c.erl | 56 +++++++++++++++--------------- test/header_continuation_SUITE.erl | 22 +++++++++--- test/http2_frame_size_SUITE.erl | 15 ++++++-- test/protocol_errors_SUITE.erl | 5 ++- 4 files changed, 61 insertions(+), 37 deletions(-) diff --git a/src/http2c.erl b/src/http2c.erl index e155f015..264efd35 100644 --- a/src/http2c.erl +++ b/src/http2c.erl @@ -17,7 +17,7 @@ %% API -export([ - start_link/0, + start_link/1, send_binary/2, send_frames/2, send_unaltered_frames/2, @@ -39,9 +39,9 @@ }). %% Starts a server. Should probably take args eventually --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link(?MODULE, [], []). +-spec start_link(any()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Opts) -> + gen_server:start_link(?MODULE, [Opts], []). %% Three API levels: %% 1: lowest: Send a frame or set of frames @@ -110,40 +110,40 @@ get_frames(Pid, StreamId) -> {ok, #http2c_state{}, timeout()} | ignore | {stop, any()}. -init([]) -> - Host = "localhost", - {ok, Port} = application:get_env(chatterbox, port), - ClientOptions = [ - binary, - {packet, raw}, - {active, false} - ], - %% TODO: Stealing from the server config here :/ - {ok, SSLEnabled} = application:get_env(chatterbox, ssl), - {Transport, Options} = case SSLEnabled of - true -> - {ok, SSLOptions} = application:get_env(chatterbox, ssl_options), - {ssl, ClientOptions ++ SSLOptions ++ [{client_preferred_next_protocols, {client, [<<"h2">>]}}]}; - false -> - {gen_tcp, ClientOptions} - end, +init([Options]) -> + Host = proplists:get_value(host, Options), + Port = proplists:get_value(port, Options), + ClientOptions = [binary, + {packet, raw}, + {active, false}], + {Transport, Options1} = + case proplists:get_value(ssl, Options, false) of + false -> + {gen_tcp, ClientOptions}; + true -> + SSLOptions = proplists:get_value(ssl_opts, Options, []), + {ssl, ClientOptions ++ SSLOptions ++ + [{client_preferred_next_protocols, {client, [<<"h2">>]}}]} + end, + lager:debug("Options: ~p", [Options1]), lager:debug("Transport: ~p", [Transport]), - {ok, Socket} = Transport:connect(Host, Port, Options), + {ok, Socket} = Transport:connect(Host, Port, Options1), %% Send the preamble - Transport:send(Socket, <>), - - %% Settings Handshake - {_SSH, ServerSettings} = http2_frame:read({Transport, Socket}), - http2_frame_settings:ack({Transport, Socket}), + ok = Transport:send(Socket, <>), + %% Send the Settings Handshake before anything else. ClientSettings = #settings{}, http2_frame_settings:send({Transport, Socket}, #settings{}, ClientSettings), {AH, _Ack} = http2_frame:read({Transport, Socket}), + + %% Read and accept the settings from the server. @todo handle errors here + {_SSH, ServerSettings} = http2_frame:read({Transport, Socket}), + http2_frame_settings:ack({Transport, Socket}), + lager:debug("Ack: ~p", [_Ack]), lager:debug("AH: ~p", [AH]), Ack = ?IS_FLAG(AH#frame_header.flags, ?FLAG_ACK), lager:debug("Ack: ~p", [Ack]), - case Transport of ssl -> ssl:setopts(Socket, [{active, true}]); diff --git a/test/header_continuation_SUITE.erl b/test/header_continuation_SUITE.erl index 7c72aedb..79badbbe 100644 --- a/test/header_continuation_SUITE.erl +++ b/test/header_continuation_SUITE.erl @@ -22,7 +22,10 @@ end_per_testcase(_, Config) -> ok. basic_continuation(_Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), %% build some headers Headers = [ @@ -54,12 +57,15 @@ basic_continuation(_Config) -> ct:pal("Resp: ~p", [Resp]), ?assertEqual(2, length(Resp)), - + gen_server:stop(Client), ok. basic_continuation_end_stream_first(_Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), %% build some headers Headers = [ @@ -96,7 +102,10 @@ basic_continuation_end_stream_first(_Config) -> bad_frame_wrong_type_between_continuations(_Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), %% build some headers Headers = [ @@ -138,7 +147,10 @@ bad_frame_wrong_type_between_continuations(_Config) -> ok. bad_frame_wrong_stream_between_continuations(_Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), %% build some headers Headers = [ diff --git a/test/http2_frame_size_SUITE.erl b/test/http2_frame_size_SUITE.erl index e42fd9f1..f5dd3110 100644 --- a/test/http2_frame_size_SUITE.erl +++ b/test/http2_frame_size_SUITE.erl @@ -34,7 +34,10 @@ wrong_size_window_update(Config) -> send_wrong_size(?WINDOW_UPDATE, Config). send_wrong_size(Type, _Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), http2c:send_binary(Client, <<10:24,Type:8,0:1,0:31,0:100>>), timer:sleep(100), Resp = http2c:get_frames(Client, 0), @@ -46,7 +49,10 @@ send_wrong_size(Type, _Config) -> ok. frame_too_big(_Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), Frames = [ {#frame_header{length=16392,type=?HEADERS,flags=?FLAG_END_HEADERS,stream_id=3}, #headers{block_fragment = <<1:131136>>}} ], @@ -66,7 +72,10 @@ frame_too_big(_Config) -> ok. euc(_Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), Headers1 = [ {<<":path">>, <<"/">>}, diff --git a/test/protocol_errors_SUITE.erl b/test/protocol_errors_SUITE.erl index 35c75690..b90319a2 100644 --- a/test/protocol_errors_SUITE.erl +++ b/test/protocol_errors_SUITE.erl @@ -55,7 +55,10 @@ no_goaway_frame_on_non_zero(Config) -> one_frame(Frame, _Config) -> - {ok, Client} = http2c:start_link(), + {ok, Port} = application:get_env(chatterbox, port), + {ok, Client} = http2c:start_link([{host, "127.0.0.1"}, + {port, Port}, + {ssl, true}]), http2c:send_unaltered_frames(Client, [Frame]), %% How do I get the response? Should be GOAWAY with PROTOCOL_ERROR From 0746d28dc8fbe39f84a2056fd85f377fe360edbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Kjartan=20Yasin?= Date: Mon, 11 Jan 2016 21:31:27 -0800 Subject: [PATCH 3/3] Spiking on a HTTP2 client FSM. --- src/http2_client.erl | 229 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 src/http2_client.erl diff --git a/src/http2_client.erl b/src/http2_client.erl new file mode 100644 index 00000000..d26b78c3 --- /dev/null +++ b/src/http2_client.erl @@ -0,0 +1,229 @@ +-module(http2_client). + +-behaviour(gen_fsm). + +-include("http2.hrl"). + +-export([start_link/1]). + +-export([send_request/3, + send_request_sync/4]). + +-export([init/1, + handle_event/3, + handle_sync_event/4, + handle_info/3, + code_change/4, + terminate/3]). + +-export([connected/2]). + +-record(parser, { + wfh = undefined :: frame_header(), + wfp = <<>> :: binary(), + frames = [] :: [frame()] + }). + +-record(stream, { + sender :: pid(), + frames :: [frame()] + }). + +-record(state, { + connection, + client_settings, + server_settings, + nas = 1 :: pos_integer(), + parser = #parser{} :: #parser{}, + streams = dict:new() :: dict:dict() + }). + +-type option() :: {host, string()} | + {port, non_neg_integer()} | + {ssl, boolean()} | + {ssl_opts, list()}. + +-spec start_link([option()]) -> + {ok, pid()} | + ignore | + {error, term()}. +start_link(Options) when is_list(Options) -> + Host = proplists:get_value(host, Options), + Port = proplists:get_value(port, Options), + {Transport, ClientOptions} = client_options(Options), + case Transport:connect(Host, Port, ClientOptions) of + {ok, Socket} -> + ConnectionState = #connection_state{socket = {Transport, Socket}}, + handshake_connection(#state{connection=ConnectionState}, + Options); + {error, timeout} -> + {error, timeout}; + Error -> + Error + end. + +send_request(Pid, Headers, Body) -> + gen_fsm:send_event(Pid, {send_request, Headers, Body, []}). + +send_request_sync(Pid, Headers, Body, Timeout) -> + Sender = self(), + gen_fsm:send_event(Pid, {send_request, Headers, Body, [{sender, Sender}]}), + receive + {http2, {frame, Frame}} -> + Frame + after Timeout -> + timeout + end. + +init([#state{connection=#connection_state{socket={Transport, Socket}}}=State]) -> + % The connection has been successfully handshaked before the process is + % started. + Transport:setopts(Socket, [{active, true}]), + {ok, connected, State}. + +connected({send_request, Headers, Body, Options}, + #state{nas=Nas, connection=Connection, streams=Streams}=State) -> + Stream = #stream{sender=proplists:get_value(sender, Options)}, + Streams1 = dict:store(Nas, Stream, Streams), + {HeaderFrame, Connection1} = frame_headers(Headers, Nas, Connection), + DataFrame = frame_body(Body, Nas, Connection), + send_frames([HeaderFrame|DataFrame], Connection), + {next_state, connected, State#state{connection=Connection1, nas=Nas+2, + streams = Streams1}}; +connected(_Event, State) -> + {next_state, connected, State}. + +handle_event(_Event, FsmState, State) -> + {next_state, FsmState, State}. + +handle_sync_event(_Event, _From, FsmState, State) -> + {next_state, FsmState, State}. + +handle_info({_Transport, _Socket, Data}, connected, + #state{parser=Parser, + streams=Streams}=State) -> + lager:debug("Data ~p", [Data]), + {Frames, Parser1} = process_binary(Data, Parser), + lager:error("Frames ~p", [Frames]), + Streams1 = notify_senders(Frames, Streams), + {next_state, connected, State#state{parser=Parser1, streams=Streams1}}. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +terminate(normal, _StateName, _State) -> + ok; +terminate(_Reason, _StateName, _State) -> + lager:debug("terminate reason: ~p~n", [_Reason]). + +% Internal +notify_senders([], Streams) -> + Streams; +notify_senders([{#frame_header{stream_id=SID}, _}=Frame|Frames], + Streams) -> + case dict:find(SID, Streams) of + {ok, #stream{sender=Pid}} -> + Parsed = parse_frame(Frame), + Pid ! {http2, {frame, Parsed}}, + Streams1 = remove_stream(Frame, Streams), + notify_senders(Frames, Streams1); + error -> + notify_senders(Frames, Streams) + end. + +parse_frame({#frame_header{type = ?HEADERS}, #headers{block_fragment=BF}}) -> + Ctx = hpack:new_decode_context(), + {Headers, _} = hpack:decode(BF, Ctx), + {headers, Headers}; +parse_frame(Frame) -> + Frame. + +remove_stream(#frame_header{flags = 1, stream_id = SID}, Streams) -> + dict:erase(SID, Streams); +remove_stream(_, Streams) -> + Streams. + +frame_body(Body, Nas, #connection_state{send_settings=SS}) -> + http2_frame_data:to_frames(Nas, Body, SS). + +frame_headers(Headers, Nas, #connection_state{encode_context=EC}=Connection) -> + {HeaderFrame, EC1} = http2_frame_headers:to_frame(Nas, Headers, EC), + {HeaderFrame, Connection#connection_state{encode_context=EC1}}. + +send_frames(Frames, #connection_state{socket={Transport, Socket}}) -> + [Transport:send(Socket, http2_frame:to_binary(F)) || F <- Frames]. + +handshake_connection(#state{connection=#connection_state{socket={Transport, Socket}}}=State, + _Options) -> + ok = Transport:send(Socket, <>), + ClientSettings = #settings{}, + http2_frame_settings:send({Transport, Socket}, #settings{}, ClientSettings), + case http2_frame:read({Transport, Socket}) of + {AH, _Ack} -> + Ack = ?IS_FLAG(AH#frame_header.flags, ?FLAG_ACK), + lager:debug("Ack: ~p", [Ack]), + case http2_frame:read({Transport, Socket}) of + {_SSH, ServerSettings} -> + ConnectionState = #connection_state{ + socket = {Transport, Socket}, + recv_settings = ClientSettings, + send_settings = http2_frame_settings:overlay(#settings{}, ServerSettings) + }, + start_fsm(State#state{connection = ConnectionState}); + Error -> + Error + end; + Error -> + Error + end. + +start_fsm(#state{connection=#connection_state{socket={Transport, + Socket}}}=State) -> + case gen_fsm:start_link(?MODULE, [State], []) of + {ok, Pid} -> + Transport:controlling_process(Socket, Pid), + {ok, Pid}; + Error -> + Error + end. + +client_options(Options) -> + ClientOptions = [binary, {packet, raw}, {active, false}], + case proplists:get_value(ssl, Options, false) of + false -> {gen_tcp, ClientOptions}; + true -> + SSLOptions = proplists:get_value(ssl_opts, Options, []), + {ssl, ClientOptions ++ SSLOptions ++ + [{client_preferred_next_protocols, {client, [<<"h2">>]}}]} + end. + +process_binary(<<>>, #parser{frames=Frames}=Parser) -> + {Frames, Parser#parser{frames = []}}; +process_binary(<>, + #parser{frames = Frames, + wfp = <<>>, + wfh = undefined} = Parser) -> + {Header, <<>>} = http2_frame:read_binary_frame_header(HeaderBin), + case byte_size(Bin) >= Header#frame_header.length of + true -> + {ok, Payload, Rem} = http2_frame:read_binary_payload(Bin, Header), + process_binary(Rem, Parser#parser{ + frames = Frames ++ [{Header,Payload}]}); + false -> + {Frames, Parser#parser{wfh = Header, wfp = Bin, frames = []}} + end; +process_binary(Bin, #parser{wfh = Header, wfp = <<>>, + frames = Frames}=Parser) -> + case byte_size(Bin) >= Header#frame_header.length of + true -> + {ok, Payload, Rem} = http2_frame:read_binary_payload(Bin, Header), + process_binary(Rem, Parser#parser{ + wfh = undefined, + wfp = <<>>, + frames = Frames ++ [{Header,Payload}]}); + false -> + {Frames, Parser#parser{wfh = Header, wfp = Bin, frames = []}} + end; +process_binary(Bin, #parser{wfp=Wfp}=Parser) -> + process_binary(iolist_to_binary([Wfp, Bin]), Parser#parser{wfp = <<>>}). +