diff --git a/.gitignore b/.gitignore index 1bb5c1c..fee065d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ erl_crash.dump rebar.lock .gradle/ *.log +*.iml +.idea/* diff --git a/README.md b/README.md index b6acb84..115dc25 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ which is generated from `priv/kafka.bnf`. The root level `schema` is always a `struct`. A `struct` consists of fields having lower level (maybe nested) `schema` -Struct fileds are documented in `priv/kafka.bnf` as comments, +Struct fields are documented in `priv/kafka.bnf` as comments, but the comments are not generated as Erlang comments in `kpro_schema.erl` Take `produce` API for example diff --git a/src/kpro_brokers.erl b/src/kpro_brokers.erl index a63b34d..8c072d6 100644 --- a/src/kpro_brokers.erl +++ b/src/kpro_brokers.erl @@ -185,7 +185,7 @@ discover_partition_leader(Connection, Topic, Partition, Timeout) -> {ok, {Host, Port}} end ], - kpro_lib:ok_pipe(FL). + kpro_lib:ok_pipe(FL, Timeout). %% @doc Discover group or transactional coordinator. -spec discover_coordinator(connection(), coordinator_type(), diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index 149c793..2d0fbe8 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -188,11 +188,12 @@ init(Parent, Host, Port, Config) -> proc_lib:init_ack(Parent, {ok, self()}), loop(State, Debug). -%% Connect to the given endpoint, then initalize connection. +%% Connect to the given endpoint, then initialize connection. %% Raise an error exception for any failure. -spec connect(pid(), hostname(), portnum(), config()) -> state(). connect(Parent, Host, Port, Config) -> Timeout = get_connect_timeout(Config), + Deadline = deadline(Timeout), %% initial active opt should be 'false' before upgrading to ssl SockOpts = [{active, false}, binary] ++ get_extra_sock_opts(Config), case gen_tcp:connect(Host, Port, SockOpts, Timeout) of @@ -202,7 +203,7 @@ connect(Parent, Host, Port, Config) -> , remote = {Host, Port} , sock = Sock }, - init_connection(State, Config); + init_connection(State, Config, Deadline); {error, Reason} -> erlang:error(Reason) end. @@ -214,8 +215,7 @@ connect(Parent, Host, Port, Config) -> init_connection(#state{ client_id = ClientId , sock = Sock , remote = {Host, _} - } = State, Config) -> - Timeout = get_connect_timeout(Config), + } = State, Config, Deadline) -> %% adjusting buffer size as per recommendation at %% http://erlang.org/doc/man/inet.html#setopts-2 %% idea is from github.com/epgsql/epgsql @@ -224,13 +224,13 @@ init_connection(#state{ client_id = ClientId ok = inet:setopts(Sock, [{buffer, max(RecBufSize, SndBufSize)}]), SslOpts = maps:get(ssl, Config, false), Mod = get_tcp_mod(SslOpts), - NewSock = maybe_upgrade_to_ssl(Sock, Mod, SslOpts, Host, Timeout), + NewSock = maybe_upgrade_to_ssl(Sock, Mod, SslOpts, Host, timeout(Deadline)), %% from now on, it's all packet-4 messages ok = setopts(NewSock, Mod, [{packet, 4}]), Versions = case Config of #{query_api_versions := false} -> ?undef; - _ -> query_api_versions(NewSock, Mod, ClientId, Timeout) + _ -> query_api_versions(NewSock, Mod, ClientId, Deadline) end, HandshakeVsn = case Versions of #{sasl_handshake := {_, V}} -> V; @@ -238,12 +238,12 @@ init_connection(#state{ client_id = ClientId end, SaslOpts = get_sasl_opt(Config), ok = kpro_sasl:auth(Host, NewSock, Mod, ClientId, - Timeout, SaslOpts, HandshakeVsn), + timeout(Deadline), SaslOpts, HandshakeVsn), State#state{mod = Mod, sock = NewSock, api_vsns = Versions}. -query_api_versions(Sock, Mod, ClientId, Timeout) -> +query_api_versions(Sock, Mod, ClientId, Deadline) -> Req = kpro_req_lib:make(api_versions, 0, []), - Rsp = kpro_lib:send_and_recv(Req, Sock, Mod, ClientId, Timeout), + Rsp = kpro_lib:send_and_recv(Req, Sock, Mod, ClientId, timeout(Deadline)), ErrorCode = find(error_code, Rsp), case ErrorCode =:= ?no_error of true -> @@ -608,6 +608,12 @@ get_client_id(Config) -> find(FieldName, Struct) -> kpro_lib:find(FieldName, Struct). +deadline(Timeout) -> + erlang:monotonic_time(millisecond) + Timeout. + +timeout(Deadline) -> + erlang:max(0, Deadline - erlang:monotonic_time(millisecond)). + %%%_* Emacs ==================================================================== %%% Local Variables: %%% allout-layout: t diff --git a/src/kpro_lib.erl b/src/kpro_lib.erl index 680cfa2..da2519c 100644 --- a/src/kpro_lib.erl +++ b/src/kpro_lib.erl @@ -91,16 +91,16 @@ send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req, end. %% @doc Function pipeline. -%% The first function takes no args, all succeeding ones shoud be arity-0 or 1 +%% The first function takes no args, all succeeding ones should be arity-0 or 1 %% functions. All functions should retrun %% `ok' | `{ok, Result}' | `{error, Reason}'. %% where `Result' is the input arg of the next function, %% or the result of pipeline if it's the last pipe node. %% -%% NOTE: If a funcition returns `ok' the next should be an arity-0 function. +%% NOTE: If a function returns `ok' the next should be an arity-0 function. %% Any `{error, Reason}' return value would cause the pipeline to abort. %% -%% NOTE: The pipe funcions are delegated to an agent process to evaluate, +%% NOTE: The pipe functions are delegated to an agent process to evaluate, %% only exceptions and process links are propagated back to caller %% other side-effects like monitor references are not handled. ok_pipe(FunList, Timeout) ->