From 4c17d75b51445b7a9758b1c5a8f0d2dd3c82c680 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Tue, 30 May 2023 05:56:52 -0600 Subject: [PATCH] support connection options map in endpoint tuple --- src/grpcbox_channel.erl | 29 +++++++++++++++++++---------- src/grpcbox_subchannel.erl | 22 +++++++--------------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/grpcbox_channel.erl b/src/grpcbox_channel.erl index 3530f4d..2c4781f 100644 --- a/src/grpcbox_channel.erl +++ b/src/grpcbox_channel.erl @@ -20,11 +20,10 @@ -type t() :: any(). -type name() :: t(). -type transport() :: http | https. --type endpoint_options() :: [ssl:ssl_option() | - {connect_timeout, integer()} | - {tcp_user_timeout, integer()}]. -type host() :: inet:ip_address() | inet:hostname(). --type endpoint() :: {transport(), host(), inet:port_number(), endpoint_options()}. +-type connection_settings() :: map(). +-type endpoint() :: {transport(), host(), inet:port_number(), [ssl:ssl_options()]} | + {transport(), host(), inet:port_number(), [ssl:ssl_options()], connection_settings()}. -type options() :: #{balancer => load_balancer(), encoding => gprcbox:encoding(), @@ -88,6 +87,9 @@ stop(Name, Reason) -> init([Name, Endpoints, Options]) -> process_flag(trap_exit, true), + + Endpoints1 = normalize_endpoints(Endpoints), + BalancerType = maps:get(balancer, Options, round_robin), Encoding = maps:get(encoding, Options, identity), StatsHandler = maps:get(stats_handler, Options, undefined), @@ -100,14 +102,14 @@ init([Name, Endpoints, Options]) -> pool = Name, encoding = Encoding, stats_handler = StatsHandler, - endpoints = Endpoints + endpoints = Endpoints1 }, case maps:get(sync_start, Options, false) of false -> {ok, idle, Data, [{next_event, internal, connect}]}; true -> - _ = start_workers(Name, StatsHandler, Encoding, Endpoints), + _ = start_workers(Name, StatsHandler, Encoding, Endpoints1), {ok, connected, Data} end. @@ -173,8 +175,15 @@ insert_stream_interceptor(Name, _Type, Interceptors) -> start_workers(Pool, StatsHandler, Encoding, Endpoints) -> [begin gproc_pool:add_worker(Pool, Endpoint), - {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, EndpointOptions}, - Encoding, StatsHandler), + {ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions, ConnectionSettings}, + Encoding, StatsHandler), Pid - end || Endpoint={Transport, Host, Port, EndpointOptions} <- Endpoints]. - + end || Endpoint={Transport, Host, Port, SSLOptions, ConnectionSettings} <- Endpoints]. + +%% add the chatterbox connection settings map to the endpoint if it isn't there already +normalize_endpoints(Endpoints) -> + lists:map(fun({Transport, Host, Port, SSLOptions}) -> + {Transport, Host, Port, SSLOptions, #{}}; + ({Transport, Host, Port, SSLOptions, ConnectionSettings}) -> + {Transport, Host, Port, SSLOptions, ConnectionSettings} + end, Endpoints). diff --git a/src/grpcbox_subchannel.erl b/src/grpcbox_subchannel.erl index 6f11155..5e13c1e 100644 --- a/src/grpcbox_subchannel.erl +++ b/src/grpcbox_subchannel.erl @@ -48,17 +48,17 @@ init([Name, Channel, Endpoint, Encoding, StatsHandler]) -> endpoint=Endpoint, channel=Channel}}. -info_map({http, Host, 80, _}, Encoding, StatsHandler) -> +info_map({http, Host, 80, _, _}, Encoding, StatsHandler) -> #{authority => list_to_binary(Host), scheme => <<"http">>, encoding => Encoding, stats_handler => StatsHandler}; -info_map({https, Host, 443, _}, Encoding, StatsHandler) -> +info_map({https, Host, 443, _, _}, Encoding, StatsHandler) -> #{authority => list_to_binary(Host), scheme => <<"https">>, encoding => Encoding, stats_handler => StatsHandler}; -info_map({Scheme, Host, Port, _}, Encoding, StatsHandler) -> +info_map({Scheme, Host, Port, _, _}, Encoding, StatsHandler) -> #{authority => list_to_binary(Host ++ ":" ++ integer_to_list(Port)), scheme => atom_to_binary(Scheme, utf8), encoding => Encoding, @@ -111,18 +111,10 @@ terminate(Reason, _State, #data{conn_pid=Pid, ok. connect(Data=#data{conn=undefined, - endpoint={Transport, Host, Port, EndpointOptions}}, From, Actions) -> - % Get and delete non-ssl options from endpoint options, these are passed as connection settings - ConnectTimeout = proplists:get_value(connect_timeout, EndpointOptions, 5000), - TcpUserTimeout = proplists:get_value(tcp_user_timeout, EndpointOptions, 0), - EndpointOptions2 = proplists:delete(connect_timeout, EndpointOptions), - EndpointOptions3 = proplists:delete(tcp_user_timeout, EndpointOptions2), - - case h2_client:start_link(Transport, Host, Port, options(Transport, EndpointOptions3), - #{garbage_on_end => true, - stream_callback_mod => grpcbox_client_stream, - connect_timeout => ConnectTimeout, - tcp_user_timeout => TcpUserTimeout}) of + endpoint={Transport, Host, Port, SSLOptions, ConnectionSettings}}, From, Actions) -> + case h2_client:start_link(Transport, Host, Port, options(Transport, SSLOptions), + ConnectionSettings#{garbage_on_end => true, + stream_callback_mod => grpcbox_client_stream}) of {ok, Conn} -> Pid = h2_stream_set:connection(Conn), {next_state, ready, Data#data{conn=Conn, conn_pid=Pid}, Actions};