Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not reuse the same timeout during connection timeout #72

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ erl_crash.dump
rebar.lock
.gradle/
*.log
*.iml
.idea/*
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ which is generated from `priv/kafka.bnf`.
The root level `schema` is always a `struct`.
A `struct` consists of fields having lower level (maybe nested) `schema`

Struct fileds are documented in `priv/kafka.bnf` as comments,
Struct fields are documented in `priv/kafka.bnf` as comments,
but the comments are not generated as Erlang comments in `kpro_schema.erl`

Take `produce` API for example
Expand Down
2 changes: 1 addition & 1 deletion src/kpro_brokers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ discover_partition_leader(Connection, Topic, Partition, Timeout) ->
{ok, {Host, Port}}
end
],
kpro_lib:ok_pipe(FL).
kpro_lib:ok_pipe(FL, Timeout).

%% @doc Discover group or transactional coordinator.
-spec discover_coordinator(connection(), coordinator_type(),
Expand Down
24 changes: 15 additions & 9 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ init(Parent, Host, Port, Config) ->
proc_lib:init_ack(Parent, {ok, self()}),
loop(State, Debug).

%% Connect to the given endpoint, then initalize connection.
%% Connect to the given endpoint, then initialize connection.
%% Raise an error exception for any failure.
-spec connect(pid(), hostname(), portnum(), config()) -> state().
connect(Parent, Host, Port, Config) ->
Timeout = get_connect_timeout(Config),
Deadline = deadline(Timeout),
%% initial active opt should be 'false' before upgrading to ssl
SockOpts = [{active, false}, binary] ++ get_extra_sock_opts(Config),
case gen_tcp:connect(Host, Port, SockOpts, Timeout) of
Expand All @@ -202,7 +203,7 @@ connect(Parent, Host, Port, Config) ->
, remote = {Host, Port}
, sock = Sock
},
init_connection(State, Config);
init_connection(State, Config, Deadline);
{error, Reason} ->
erlang:error(Reason)
end.
Expand All @@ -214,8 +215,7 @@ connect(Parent, Host, Port, Config) ->
init_connection(#state{ client_id = ClientId
, sock = Sock
, remote = {Host, _}
} = State, Config) ->
Timeout = get_connect_timeout(Config),
} = State, Config, Deadline) ->
%% adjusting buffer size as per recommendation at
%% http://erlang.org/doc/man/inet.html#setopts-2
%% idea is from github.com/epgsql/epgsql
Expand All @@ -224,26 +224,26 @@ init_connection(#state{ client_id = ClientId
ok = inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]),
SslOpts = maps:get(ssl, Config, false),
Mod = get_tcp_mod(SslOpts),
NewSock = maybe_upgrade_to_ssl(Sock, Mod, SslOpts, Host, Timeout),
NewSock = maybe_upgrade_to_ssl(Sock, Mod, SslOpts, Host, timeout(Deadline)),
%% from now on, it's all packet-4 messages
ok = setopts(NewSock, Mod, [{packet, 4}]),
Versions =
case Config of
#{query_api_versions := false} -> ?undef;
_ -> query_api_versions(NewSock, Mod, ClientId, Timeout)
_ -> query_api_versions(NewSock, Mod, ClientId, Deadline)
end,
HandshakeVsn = case Versions of
#{sasl_handshake := {_, V}} -> V;
_ -> 0
end,
SaslOpts = get_sasl_opt(Config),
ok = kpro_sasl:auth(Host, NewSock, Mod, ClientId,
Timeout, SaslOpts, HandshakeVsn),
timeout(Deadline), SaslOpts, HandshakeVsn),
State#state{mod = Mod, sock = NewSock, api_vsns = Versions}.

query_api_versions(Sock, Mod, ClientId, Timeout) ->
query_api_versions(Sock, Mod, ClientId, Deadline) ->
Req = kpro_req_lib:make(api_versions, 0, []),
Rsp = kpro_lib:send_and_recv(Req, Sock, Mod, ClientId, Timeout),
Rsp = kpro_lib:send_and_recv(Req, Sock, Mod, ClientId, timeout(Deadline)),
ErrorCode = find(error_code, Rsp),
case ErrorCode =:= ?no_error of
true ->
Expand Down Expand Up @@ -608,6 +608,12 @@ get_client_id(Config) ->

find(FieldName, Struct) -> kpro_lib:find(FieldName, Struct).

deadline(Timeout) ->
erlang:monotonic_time(millisecond) + Timeout.

timeout(Deadline) ->
erlang:max(0, Deadline - erlang:monotonic_time(millisecond)).

%%%_* Emacs ====================================================================
%%% Local Variables:
%%% allout-layout: t
Expand Down
6 changes: 3 additions & 3 deletions src/kpro_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req,
end.

%% @doc Function pipeline.
%% The first function takes no args, all succeeding ones shoud be arity-0 or 1
%% The first function takes no args, all succeeding ones should be arity-0 or 1
%% functions. All functions should retrun
%% `ok' | `{ok, Result}' | `{error, Reason}'.
%% where `Result' is the input arg of the next function,
%% or the result of pipeline if it's the last pipe node.
%%
%% NOTE: If a funcition returns `ok' the next should be an arity-0 function.
%% NOTE: If a function returns `ok' the next should be an arity-0 function.
%% Any `{error, Reason}' return value would cause the pipeline to abort.
%%
%% NOTE: The pipe funcions are delegated to an agent process to evaluate,
%% NOTE: The pipe functions are delegated to an agent process to evaluate,
%% only exceptions and process links are propagated back to caller
%% other side-effects like monitor references are not handled.
ok_pipe(FunList, Timeout) ->
Expand Down