Skip to content

Commit

Permalink
support connection options map in endpoint tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
tsloughter committed Jun 19, 2023
1 parent da0ab58 commit 4c17d75
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
29 changes: 19 additions & 10 deletions src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
-type t() :: any().
-type name() :: t().
-type transport() :: http | https.
-type endpoint_options() :: [ssl:ssl_option() |
{connect_timeout, integer()} |
{tcp_user_timeout, integer()}].
-type host() :: inet:ip_address() | inet:hostname().
-type endpoint() :: {transport(), host(), inet:port_number(), endpoint_options()}.
-type connection_settings() :: map().
-type endpoint() :: {transport(), host(), inet:port_number(), [ssl:ssl_options()]} |
{transport(), host(), inet:port_number(), [ssl:ssl_options()], connection_settings()}.

-type options() :: #{balancer => load_balancer(),
encoding => gprcbox:encoding(),
Expand Down Expand Up @@ -88,6 +87,9 @@ stop(Name, Reason) ->

init([Name, Endpoints, Options]) ->
process_flag(trap_exit, true),

Endpoints1 = normalize_endpoints(Endpoints),

BalancerType = maps:get(balancer, Options, round_robin),
Encoding = maps:get(encoding, Options, identity),
StatsHandler = maps:get(stats_handler, Options, undefined),
Expand All @@ -100,14 +102,14 @@ init([Name, Endpoints, Options]) ->
pool = Name,
encoding = Encoding,
stats_handler = StatsHandler,
endpoints = Endpoints
endpoints = Endpoints1
},

case maps:get(sync_start, Options, false) of
false ->
{ok, idle, Data, [{next_event, internal, connect}]};
true ->
_ = start_workers(Name, StatsHandler, Encoding, Endpoints),
_ = start_workers(Name, StatsHandler, Encoding, Endpoints1),
{ok, connected, Data}
end.

Expand Down Expand Up @@ -173,8 +175,15 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
[begin
gproc_pool:add_worker(Pool, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, EndpointOptions},
Encoding, StatsHandler),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions, ConnectionSettings},
Encoding, StatsHandler),
Pid
end || Endpoint={Transport, Host, Port, EndpointOptions} <- Endpoints].

end || Endpoint={Transport, Host, Port, SSLOptions, ConnectionSettings} <- Endpoints].

%% add the chatterbox connection settings map to the endpoint if it isn't there already
normalize_endpoints(Endpoints) ->
lists:map(fun({Transport, Host, Port, SSLOptions}) ->
{Transport, Host, Port, SSLOptions, #{}};
({Transport, Host, Port, SSLOptions, ConnectionSettings}) ->
{Transport, Host, Port, SSLOptions, ConnectionSettings}
end, Endpoints).
22 changes: 7 additions & 15 deletions src/grpcbox_subchannel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ init([Name, Channel, Endpoint, Encoding, StatsHandler]) ->
endpoint=Endpoint,
channel=Channel}}.

info_map({http, Host, 80, _}, Encoding, StatsHandler) ->
info_map({http, Host, 80, _, _}, Encoding, StatsHandler) ->
#{authority => list_to_binary(Host),
scheme => <<"http">>,
encoding => Encoding,
stats_handler => StatsHandler};
info_map({https, Host, 443, _}, Encoding, StatsHandler) ->
info_map({https, Host, 443, _, _}, Encoding, StatsHandler) ->
#{authority => list_to_binary(Host),
scheme => <<"https">>,
encoding => Encoding,
stats_handler => StatsHandler};
info_map({Scheme, Host, Port, _}, Encoding, StatsHandler) ->
info_map({Scheme, Host, Port, _, _}, Encoding, StatsHandler) ->
#{authority => list_to_binary(Host ++ ":" ++ integer_to_list(Port)),
scheme => atom_to_binary(Scheme, utf8),
encoding => Encoding,
Expand Down Expand Up @@ -111,18 +111,10 @@ terminate(Reason, _State, #data{conn_pid=Pid,
ok.

connect(Data=#data{conn=undefined,
endpoint={Transport, Host, Port, EndpointOptions}}, From, Actions) ->
% Get and delete non-ssl options from endpoint options, these are passed as connection settings
ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000),
TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0),
EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions),
EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2),

case h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3),
#{garbage_on_end => true,
stream_callback_mod => grpcbox_client_stream,
connect_timeout => ConnectTimeout,
tcp_user_timeout => TcpUserTimeout}) of
endpoint={Transport, Host, Port, SSLOptions, ConnectionSettings}}, From, Actions) ->
case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions),
ConnectionSettings#{garbage_on_end => true,
stream_callback_mod => grpcbox_client_stream}) of
{ok, Conn} ->
Pid = h2_stream_set:connection(Conn),
{next_state, ready, Data#data{conn=Conn, conn_pid=Pid}, Actions};
Expand Down

0 comments on commit 4c17d75

Please sign in to comment.