From 5a29cb2a32db680fd23c417ccf02f855736f4b00 Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Thu, 1 Jun 2023 10:28:19 -0700 Subject: [PATCH] Update grpcbox for chatterbox performance work --- rebar.config | 2 +- rebar.lock | 18 ++++++++++-------- src/grpcbox_client_stream.erl | 3 +-- src/grpcbox_stream.erl | 18 +++++++++--------- src/grpcbox_subchannel.erl | 22 ++++++++++++---------- test/grpcbox_SUITE.erl | 8 ++------ 6 files changed, 35 insertions(+), 36 deletions(-) diff --git a/rebar.config b/rebar.config index 051e185..0c67234 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {erl_opts, [debug_info]}. -{deps, [{chatterbox, {pkg, ts_chatterbox}}, +{deps, [{chatterbox, ".*", {git, "https://github.com/novalabsxyz/chatterbox", {branch, "adt/perf-nodebug"}}}, ctx, acceptor_pool, gproc]}. diff --git a/rebar.lock b/rebar.lock index 52a826b..7d87f62 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,20 +1,22 @@ {"1.2.0", [{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},0}, - {<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.13.0">>},0}, + {<<"chatterbox">>, + {git,"https://github.com/novalabsxyz/chatterbox", + {ref,"c34c29210b4bcd18f8b1556b03f57a89f2cb54be"}}, + 0}, {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},0}, {<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0}, - {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},1}]}. + {<<"hpack">>, + {git,"https://github.com/novalabsxyz/hpack.git", + {ref,"291f83c8efbd50cf57ddcf311e5d9a4fbe2bb5cb"}}, + 1}]}. [ {pkg_hash,[ {<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>}, - {<<"chatterbox">>, <<"6F059D97BCAA758B8EA6FFFE2B3B81362BD06B639D3EA2BB088335511D691EBF">>}, {<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>}, - {<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>}, - {<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]}, + {<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>}]}, {pkg_hash_ext,[ {<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>}, - {<<"chatterbox">>, <<"B93D19104D86AF0B3F2566C4CBA2A57D2E06D103728246BA1AC6C3C0FF010AA7">>}, {<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>}, - {<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>}, - {<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]} + {<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>}]} ]. diff --git a/src/grpcbox_client_stream.erl b/src/grpcbox_client_stream.erl index 8784c9f..a8f0dd8 100644 --- a/src/grpcbox_client_stream.erl +++ b/src/grpcbox_client_stream.erl @@ -82,11 +82,10 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service, buffer => <<>>, stats_handler => StatsHandler, stats => #{}, - client_pid => self()}], Headers, [], self()) of + client_pid => self()}], Headers, Body, [], self()) of {error, _Code} = Err -> Err; {StreamId, Pid} -> - h2_connection:send_body(Conn, StreamId, Body), {ok, Conn, StreamId, Pid} end; {error, _}=Error -> diff --git a/src/grpcbox_stream.erl b/src/grpcbox_stream.erl index c46c1d7..13783c1 100644 --- a/src/grpcbox_stream.erl +++ b/src/grpcbox_stream.erl @@ -46,7 +46,7 @@ full_method :: binary() | undefined, input_ref :: reference() | undefined, callback_pid :: pid() | undefined, - connection_pid :: pid(), + connection :: h2_stream_set:stream_set(), request_encoding :: gzip | identity | undefined, response_encoding :: gzip | identity | undefined, content_type :: proto | json | undefined, @@ -83,10 +83,10 @@ }. -type grpc_extended_error_response() :: {grpc_extended_error, grpc_error_data()}. -init(ConnPid, StreamId, [Socket, ServicesTable, AuthFun, UnaryInterceptor, +init(Conn, StreamId, [Socket, ServicesTable, AuthFun, UnaryInterceptor, StreamInterceptor, StatsHandler]) -> process_flag(trap_exit, true), - State = #state{connection_pid=ConnPid, + State = #state{connection=Conn, stream_id=StreamId, services_table=ServicesTable, buffer = <<>>, @@ -337,12 +337,12 @@ end_stream(Status, Message, State=#state{headers_sent=false}) -> end_stream(Status, Message, send_headers(State)); end_stream(_Status, _Message, State=#state{trailers_sent=true}) -> {ok, State}; -end_stream(Status, Message, State=#state{connection_pid=ConnPid, +end_stream(Status, Message, State=#state{connection=Conn, stream_id=StreamId, ctx=Ctx, resp_trailers=Trailers}) -> EncodedTrailers = grpcbox_utils:encode_headers(Trailers), - h2_connection:send_trailers(ConnPid, StreamId, [{<<"grpc-status">>, Status}, + h2_connection:send_trailers(Conn, StreamId, [{<<"grpc-status">>, Status}, {<<"grpc-message">>, Message} | EncodedTrailers], [{send_end_stream, true}]), Ctx1 = ctx:with_value(Ctx, grpc_server_status, grpcbox_utils:status_to_string(Status)), @@ -362,12 +362,12 @@ send_headers(Ctx, Headers) when is_map(Headers) -> send_headers(_Metadata, State=#state{headers_sent=true}) -> State; -send_headers(Metadata, State=#state{connection_pid=ConnPid, +send_headers(Metadata, State=#state{connection=Conn, stream_id=StreamId, resp_headers=Headers, headers_sent=false}) -> MdHeaders = grpcbox_utils:encode_headers(Metadata), - h2_connection:send_headers(ConnPid, StreamId, Headers ++ MdHeaders, [{send_end_stream, false}]), + h2_connection:send_headers(Conn, StreamId, Headers ++ MdHeaders, [{send_end_stream, false}]), State#state{headers_sent=true}. code_to_status(0) -> ?GRPC_STATUS_OK; @@ -449,7 +449,7 @@ send(End, Message, State=#state{headers_sent=false}) -> State1 = send_headers(State), send(End, Message, State1); send(End, Message, State=#state{ctx=Ctx, - connection_pid=ConnPid, + connection=Conn, stream_id=StreamId, response_encoding=Encoding, method=#method{proto=Proto, @@ -457,7 +457,7 @@ send(End, Message, State=#state{ctx=Ctx, output={Output, _}}}) -> BodyToSend = Proto:encode_msg(Message, Output), OutFrame = grpcbox_frame:encode(Encoding, BodyToSend), - ok = h2_connection:send_body(ConnPid, StreamId, OutFrame, [{send_end_stream, End}]), + ok = h2_connection:send_body(Conn, StreamId, OutFrame, [{send_end_stream, End}]), stats_handler(Ctx, out_payload, #{uncompressed_size => erlang:external_size(Message), compressed_size => size(BodyToSend)}, State). diff --git a/src/grpcbox_subchannel.erl b/src/grpcbox_subchannel.erl index 8eb6bad..6f11155 100644 --- a/src/grpcbox_subchannel.erl +++ b/src/grpcbox_subchannel.erl @@ -21,7 +21,8 @@ encoding := grpcbox:encoding(), stats_handler := module() | undefined }, - conn :: pid() | undefined, + conn :: h2_stream_set:stream_set() | undefined, + conn_pid :: pid() | undefined, idle_interval :: timer:time()}). start_link(Name, Channel, Endpoint, Encoding, StatsHandler) -> @@ -79,9 +80,9 @@ disconnected(EventType, EventContent, Data) -> handle_event({call, From}, info, #data{info=Info}) -> {keep_state_and_data, [{reply, From, Info}]}; -handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) -> - {next_state, disconnected, Data#data{conn=undefined}}; -handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) -> +handle_event(info, {'EXIT', Pid, _}, Data=#data{conn_pid=Pid}) -> + {next_state, disconnected, Data#data{conn=undefined, conn_pid=undefined}}; +handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined, conn_pid=undefined}) -> keep_state_and_data; handle_event({call, From}, shutdown, _) -> {stop_and_reply, normal, {reply, From, ok}}; @@ -101,7 +102,7 @@ terminate(normal, _State, #data{conn=Pid, gproc_pool:disconnect_worker(Channel, Endpoint), gproc_pool:remove_worker(Channel, Endpoint), ok; -terminate(Reason, _State, #data{conn=Pid, +terminate(Reason, _State, #data{conn_pid=Pid, endpoint=Endpoint, channel=Channel}) -> exit(Pid, Reason), @@ -122,14 +123,15 @@ connect(Data=#data{conn=undefined, stream_callback_mod => grpcbox_client_stream, connect_timeout => ConnectTimeout, tcp_user_timeout => TcpUserTimeout}) of - {ok, Pid} -> - {next_state, ready, Data#data{conn=Pid}, Actions}; + {ok, Conn} -> + Pid = h2_stream_set:connection(Conn), + {next_state, ready, Data#data{conn=Conn, conn_pid=Pid}, Actions}; {error, _}=Error -> {next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]} end; -connect(Data=#data{conn=Pid}, From, Actions) when is_pid(Pid) -> - h2_connection:stop(Pid), - connect(Data#data{conn=undefined}, From, Actions). +connect(Data=#data{conn=Conn, conn_pid=Pid}, From, Actions) when is_pid(Pid) -> + h2_connection:stop(Conn), + connect(Data#data{conn=undefined, conn_pid=undefined}, From, Actions). options(https, Options) -> [{client_preferred_next_protocols, {client, [<<"h2">>]}} | Options]; diff --git a/test/grpcbox_SUITE.erl b/test/grpcbox_SUITE.erl index 81c1c1a..241e80b 100644 --- a/test/grpcbox_SUITE.erl +++ b/test/grpcbox_SUITE.erl @@ -694,18 +694,14 @@ stream_interceptor(_Config) -> %% verify that the chatterbox stream isn't storing frame data check_stream_state(S) -> {_, StreamState} = sys:get_state(maps:get(stream_pid, S)), - FrameQueue = element(6, StreamState), + FrameQueue = element(7, StreamState), ?assert(queue:is_empty(FrameQueue)). %% return the stream_set of a connection in the channel connection_stream_set() -> {ok, {Channel, _}} = grpcbox_channel:pick(default_channel, unary), {ok, Conn, _} = grpcbox_subchannel:conn(Channel), - {connected, ConnState} = sys:get_state(Conn), - - %% I know, I know, this will fail if the connection record in h2_connection ever has elements - %% added before the stream_set field. But for now, it is 14 and that's good enough. - element(14, ConnState). + Conn. cert_dir(Config) -> DataDir = ?config(data_dir, Config),