Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locked pgs #1

Merged
merged 24 commits into from
Aug 24, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9e96b68
Trying to lock individual merle conns when in use
jesse-lauro Aug 21, 2012
495d231
Small bug fix with how we retrieve pids
jesse-lauro Aug 21, 2012
2ed5898
Small bug fix with how we create use_count records
jesse-lauro Aug 21, 2012
114e463
It helps to actually export the function
jesse-lauro Aug 21, 2012
94fa953
Modified strategy so that merle_watchers are managed by process group…
jesse-lauro Aug 22, 2012
dd9e5bd
Removed extraneous log message
jesse-lauro Aug 22, 2012
b6c2e95
Changed the way we release the merle connection slightly
jesse-lauro Aug 22, 2012
0be553f
Reduced error logging to make less noisy
jesse-lauro Aug 22, 2012
394d186
Changed exit scheme for merle processes.. should now issue exit messa…
jesse-lauro Aug 22, 2012
a0435ab
Reduced error_logger logging when killing memcache connections
jesse-lauro Aug 22, 2012
ccc51f8
Simplified merle_cluster exec, complete with monitoring
jesse-lauro Aug 22, 2012
ba88778
Fixed a bug with merle_watcher monitoring... fixed other shit
jesse-lauro Aug 22, 2012
5a708f4
Tweaked supervision tree for merle project, so that if process pool m…
jesse-lauro Aug 23, 2012
1f7e194
Adding timestamp that tracks time when connections unlock
jesse-lauro Aug 23, 2012
059bab7
Added utility method for retrieving number of available merle connect…
jesse-lauro Aug 23, 2012
ebe815d
Debugged count_available utility method
jesse-lauro Aug 23, 2012
1eae31a
Bug fix to monitoring call
jesse-lauro Aug 23, 2012
83191ce
Adding periodic clean so that we never lose merle connections, even i…
jesse-lauro Aug 24, 2012
43afd58
Adding periodic clean so that we never lose merle connections, even i…
jesse-lauro Aug 24, 2012
f76ea9e
Added proper calculation of now to merle code
jesse-lauro Aug 24, 2012
f382bbe
One more time calc
jesse-lauro Aug 24, 2012
7aec952
Now properly cleaning against proper interval
jesse-lauro Aug 24, 2012
9381814
Added threshold in RR index incrementing
jesse-lauro Aug 24, 2012
0f01024
Shifting the round robin index to include threshold
jesse-lauro Aug 24, 2012
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 0 additions & 184 deletions src/local_pg2.erl

This file was deleted.

22 changes: 15 additions & 7 deletions src/merle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ start_link(Host, Port) ->
%% @private
init([Host, Port]) ->
log4erl:info("Socket initialized!"),

erlang:process_flag(trap_exit, true),

gen_tcp:connect(Host, Port, ?TCP_OPTS_ACTIVE).

handle_call({stats}, _From, Socket) ->
Expand Down Expand Up @@ -464,6 +467,11 @@ handle_info({tcp_closed, Socket}, Socket) ->
{stop, {error, tcp_closed}, Socket};
handle_info({tcp_error, Socket, Reason}, Socket) ->
{stop, {error, {tcp_error, Reason}}, Socket};

handle_info({'EXIT', _, Reason}, Socket) ->
log4erl:warn("Exiting merle connection ~p", [Reason]),
{stop, normal, Socket};

handle_info(_Info, State) -> {noreply, State}.

%% @private
Expand Down Expand Up @@ -507,8 +515,8 @@ send_get_cmd(Socket, Cmd, Timeout) ->
[{_, Value}] -> {ok, Value};
[] -> {error, not_found};
{error, Error} ->
log4erl:error("Encountered error from memcache; killing connection now: ~p", [Error]),
erlang:exit(Error),
log4erl:warn("Encountered error from memcache; killing connection now: ~p", [Error]),
erlang:exit(self(), Error),
{error, Error}
end,
inet:setopts(Socket, ?TCP_OPTS_ACTIVE),
Expand Down Expand Up @@ -550,7 +558,7 @@ do_recv_stats(Timeout) ->
inet:setopts(Socket, ?TCP_OPTS_ACTIVE),
[{Field, Value} | do_recv_stats(Timeout)]
after Timeout ->
timeout
timeout
end.
%% @doc receive function for simple responses (not containing VALUEs)
recv_simple_reply(Timeout) ->
Expand All @@ -559,12 +567,12 @@ recv_simple_reply(Timeout) ->
inet:setopts(Socket, ?TCP_OPTS_ACTIVE),
parse_simple_response_line(Data);
{error, closed} ->
log4erl:error("Encountered error while receiving simple reply from memcache; killing connection now."),
erlang:exit(connection_closed),
log4erl:warn("Encountered error while receiving simple reply from memcache; killing connection now."),
erlang:exit(self(), connection_closed),
connection_closed
after Timeout ->
log4erl:error("Encountered timeout while receiving simple reply from memcache; killing connection now."),
erlang:exit(timeout),
log4erl:warn("Encountered timeout while receiving simple reply from memcache; killing connection now."),
erlang:exit(self(), timeout),
{error, timeout}
end.
parse_simple_response_line(<<"OK", _B/binary>>) -> ok;
Expand Down
28 changes: 26 additions & 2 deletions src/merle_cluster.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(merle_cluster).

-export([configure/2]).
-export([configure/2, exec/4]).

index_map(F, List) ->
{Map, _} = lists:mapfoldl(fun(X, Iter) -> {F(X, Iter), Iter +1} end, 1, List),
Expand All @@ -26,14 +26,38 @@ configure(MemcachedHosts, ConnectionsPerHost) ->
{M, B} = dynamic_compile:from_string(ModuleString),
code:load_binary(M, "", B),

% start all merle watchers
lists:foreach(
fun([Host, Port]) ->
lists:foreach(
fun(_) ->
supervisor:start_child(merle_sup, [[Host, Port]])
supervisor:start_child(merle_watcher_sup, [[Host, Port]])
end,
lists:seq(1, ConnectionsPerHost)
)
end,
SortedMemcachedHosts
).


exec(Key, Fun, FullDefault, Now) ->
S = merle_cluster_dynamic:get_server(Key),

case merle_pool:get_closest_pid(round_robin, S) of
in_use ->
FullDefault;

P ->
merle_watcher:monitor(P, self()),

MC = merle_watcher:merle_connection(P),

Value = Fun(MC, Key),

merle_watcher:demonitor(P),

merle_pool:checkin_pid(P, Now),

Value

end.
Loading