-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathsqerl_client.erl
242 lines (210 loc) · 8.65 KB
/
sqerl_client.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil; fill-column: 92 -*-
%% ex: ts=4 sw=4 et
%% @author Kevin Smith <[email protected]>
%% @doc Abstraction around interacting with SQL databases
%% Copyright 2011-2012 Opscode, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
-module(sqerl_client).
-include_lib("sqerl.hrl").
-behaviour(gen_server).
-define(LOG_STATEMENT(Name, Args), case envy:get(sqerl, log_statements, ok, boolean) of
ok ->
ok;
false ->
ok;
true ->
error_logger:info_msg("(~p) Executing statement ~p with args ~p~n", [self(), Name, Args])
end).
-define(LOG_RESULT(Result), case envy:get(sqerl, log_statements, ok, boolean) of
ok ->
ok;
false ->
ok;
true ->
error_logger:info_msg("(~p) Result: ~p~n", [self(), Result])
end).
%% API
-export([start_link/0,
start_link/1,
execute/2,
execute/3,
close/1,
prepare/3,
unprepare/2,
sql_parameter_style/0]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-ifdef(TEST).
-compile([export_all]).
-endif.
-define(DEFAULT_TIMEOUT, 5000).
-record(state, {cb_mod,
cb_state,
timeout = ?DEFAULT_TIMEOUT :: pos_integer()}).
%% behavior callback definitions
-callback init(Config :: [{atom(), term()}]) ->
term().
-callback execute(StatementOrQuery :: sqerl_query(), Parameters :: [any()], State :: term()) ->
{sqerl_results(), State :: term()}.
-callback is_connected(State :: term()) ->
{true, State :: term()} | false.
-callback sql_parameter_style() ->
atom().
-callback prepare(StatementName :: atom(), SQL :: sqerl_sql(), State :: term()) ->
{ok, State :: term()}.
-callback unprepare(StatementName :: atom(), _, State :: term()) ->
{ok, State :: term()}.
%% @doc Prepare a statement
%%
-spec prepare(pid(), atom(), binary()) -> ok | {error, any()}.
prepare(Cn, Name, SQL) when is_pid(Cn), is_atom(Name), is_binary(SQL) ->
gen_server:call(Cn, {prepare, Name, SQL}, infinity).
%% @doc Unprepare a previously prepared statement
-spec unprepare(pid(), atom()) -> ok | {error, any()}.
unprepare(Cn, Name) when is_pid(Cn), is_atom(Name) ->
%% downstream code standardizes on {Call, StmtNameOrSQL, Args}
%% for simplicity so here we just set Args to none
gen_server:call(Cn, {unprepare, Name, none}, infinity).
%% @doc Execute SQL or prepared statement with no parameters.
%% See execute/3 for return values.
-spec execute(pid(), sqerl_query()) -> sqerl_results().
execute(Cn, QueryOrStatement) ->
execute(Cn, QueryOrStatement, []).
%% @doc Execute SQL or prepared statement with parameters.
-spec execute(pid(), sqerl_query(), [any()]) -> sqerl_results().
execute(Cn, QueryOrStatement, Parameters) ->
gen_server:call(Cn, {execute, QueryOrStatement, Parameters}, infinity).
%%% Close a connection
-spec close(pid()) -> ok.
close(Cn) ->
gen_server:call(Cn, close).
start_link() ->
gen_server:start_link(?MODULE, [], []).
start_link(DbType) ->
gen_server:start_link(?MODULE, [DbType], []).
init([]) ->
init(drivermod(), config()).
init(DriverMod, Config) ->
IdleCheck = proplists:get_value(idle_check, Config, ?DEFAULT_TIMEOUT),
case DriverMod:init(Config) of
{ok, CallbackState} ->
{ok, #state{cb_mod=DriverMod, cb_state=CallbackState,
timeout=IdleCheck}, IdleCheck};
Error ->
{stop, Error}
end.
handle_call({Call, QueryOrStatementName, Args}, From, State) ->
exec_driver({Call, QueryOrStatementName, Args}, From, State);
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(_Request, _From, #state{timeout=Timeout}=State) ->
{reply, ignored, State, Timeout}.
handle_cast(_Msg, #state{timeout=Timeout}=State) ->
{noreply, State, Timeout}.
handle_info(timeout, #state{cb_mod=CBMod, cb_state=CBState, timeout=Timeout}=State) ->
case CBMod:is_connected(CBState) of
{true, CBState1} ->
{noreply, State#state{cb_state=CBState1}, Timeout};
false ->
error_logger:warning_msg("Failed to verify idle connection. Shutting down ~p~n",
[self()]),
{stop, shutdown, State}
end;
handle_info(_Info, #state{timeout=Timeout}=State) ->
{noreply, State, Timeout}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% @doc Call DB driver process
exec_driver({Call, QueryOrName, Args}, _From, #state{cb_mod=CBMod, cb_state=CBState, timeout=Timeout}=State) ->
?LOG_STATEMENT(QueryOrName, Args),
{Result, NewCBState} = apply(CBMod, Call, [QueryOrName, Args, CBState]),
?LOG_RESULT(Result),
{reply, Result, State#state{cb_state=NewCBState}, Timeout}.
%% @doc Returns SQL parameter style atom, e.g. qmark, dollarn.
%% Note on approach: here we rely on sqerl config in
%% application environment to retrieve db type and from there
%% call the appropriate driver module.
%% It would be better to not be tied to how sqerl is
%% configured and instead retrieve that from state somewhere.
%% However, retrieving that from state implies making a call
%% to a process somewhere which comes with its set of
%% implications, contention being a potential issue.
%%-spec sql_parameter_style() -> atom().
sql_parameter_style() ->
Mod = drivermod(),
Mod:sql_parameter_style().
%% @doc Returns DB driver module atom based on application config. First checks the key
%% `db_driver_mod'. If this is undefined, checks for the deprecated `db_type' and
%% translates.
-spec drivermod() -> atom().
drivermod() ->
case envy:get(sqerl, db_driver_mod, undefined, atom) of
undefined ->
case envy:get(sqerl, db_type, sqerl_pgsql_client, atom) of
pgsql ->
%% default pgsql driver mod
error_logger:warning_report({deprecated_application_config,
sqerl,
db_type,
"use db_driver_mod instead"}),
sqerl_pgsql_client;
sqerl_pgsql_client ->
sqerl_pgsql_client;
BadType ->
log_and_error({unsupported_db_type, sqerl, BadType})
end;
DriverMod when is_atom(DriverMod) ->
case code:which(DriverMod) of
non_existing ->
log_and_error({does_not_exist, sqerl, db_driver_mod, DriverMod});
_ ->
DriverMod
end;
Error ->
log_and_error({invalid_application_config, sqerl, db_driver_mod, Error})
end.
%% @doc Returns the config, based on configured config_cb MFA. Defaults to
%% using sqerl_config_env:config/0, which will use the application environment.
-spec config() -> [{atom(), term()}].
config() ->
{M, F, A} = case envy:get(sqerl, config_cb, undefined, any) of
undefined ->
{sqerl_config_env, config, []};
{Mod, Fun, Args} = MFA when is_atom(Mod) andalso
is_atom(Fun) andalso
is_list(Args) ->
case code:which(Mod) of
non_existing ->
log_and_error({does_not_exist, sqerl, config_cb, Mod});
_ ->
MFA
end;
AnythingElse ->
log_and_error({invalid_config_mfa, sqerl, config_cb, AnythingElse})
end,
erlang:apply(M, F, A).
%% Helper function to report and error
log_and_error(Msg) ->
error_logger:error_report(Msg),
error(Msg).