diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4392b40 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +out +.idea +ebin +*.iml \ No newline at end of file diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..a4f3b6c --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,9 @@ +Copyright (c) 2009 Joe Williams +Copyright (c) 2009 Nick Gerakines +Copyright (c) 2010 Zabrane Mikael + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/NOTICES b/NOTICES index 769be90..77a320a 100644 --- a/NOTICES +++ b/NOTICES @@ -6,3 +6,5 @@ Contributors and Copyright holders: * Copyright 2009, Joe Williams * Copyright 2009, Nick Gerakines + * Copyright 2010, Zabrane Mikael + * Copyright 2012, Jeremy D. Acord m \ No newline at end of file diff --git a/README b/README index 6e65978..40bb18c 100644 --- a/README +++ b/README @@ -37,16 +37,18 @@ This code is available as Open Source Software under the MIT license. Features: -* Support for stats, version, getkey, getskey, delete, set, add, replace, cas, flushall, verbosity +* Support for stats, version, getkey, getskey, delete, set, add, replace, cas, flushall, verbosity, incr and decr (with specially set "counter" keys) Notes: * Uses term_to_binary and binary_to_term to serialize/deserialize Erlang terms before sending/receiving them. This allows for native Erlang terms to be returned from memcached but doesn't play well using other languages after setting values with merle or using merle to get values set by other languages. +* getkey and getskey currently don't work on keys initialized with addcounter/1. The binary data you get back won't be a serialized erlang integer, and so the implicity binary_to_term done by getkey and getskey provokes disaster. Merle Based Projects: -http://github.com/cstar/merle/tree/master -http://github.com/issuu/merle/tree/master -http://github.com/0lvin/merle/tree/master +http://github.com/cstar/merle +http://github.com/issuu/merle +http://github.com/0lvin/merle +http://github.com/ppolv/merle Usage: @@ -103,3 +105,23 @@ ok "STAT evictions 0","STAT bytes_read 216", "STAT bytes_written 468","STAT limit_maxbytes 67108864", "STAT threads 1","END"] + +> merle:addcounter("my_counter"). +ok +> merle:incr("my_counter",0). +{ok,0} +> merle:incr("my_counter",12). +{ok,12} +> merle:addcounter("my_counter"). +ok +> merle:incr("my_counter",12). +{ok,24} +> merle:decr("my_counter",30). +{ok,0} +> merle:incr("nonexistent_counter",0). +not_found +> merle:delete("my_counter"). +ok +> merle:delete("my_counter"). +not_found + diff --git a/rebar b/rebar new file mode 100755 index 0000000..77abae6 Binary files /dev/null and b/rebar differ diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 11bb66d..94a23fb 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1,4 +1,4 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is gen_server2 @@ -16,7 +16,48 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% All modifications are (C) 2009 LShift Ltd. +%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 +%% allow callers to attach priorities to requests. Requests with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. +%% +%% 5) The callback module can optionally implement +%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be +%% called immediately prior to and post hibernation, respectively. If +%% handle_pre_hibernate returns {hibernate, NewState} then the process +%% will hibernate. If the module does not implement +%% handle_pre_hibernate/1 then the default action is to hibernate. +%% +%% 6) init can return a 4th arg, {backoff, InitialTimeout, +%% MinimumTimeout, DesiredHibernatePeriod} (all in +%% milliseconds). Then, on all callbacks which can return a timeout +%% (including init), timeout can be 'hibernate'. When this is the +%% case, the current timeout value will be used (initially, the +%% InitialTimeout supplied from init). After this timeout has +%% occurred, hibernation will occur as normal. Upon awaking, a new +%% current timeout value will be calculated. +%% +%% The purpose is that the gen_server2 takes care of adjusting the +%% current timeout value such that the process will increase the +%% timeout value repeatedly if it is unable to sleep for the +%% DesiredHibernatePeriod. If it is able to sleep for the +%% DesiredHibernatePeriod it will decrease the current timeout down to +%% the MinimumTimeout, so that the process is put to sleep sooner (and +%% hopefully stays asleep for longer). In short, should a process +%% using this receive a burst of messages, it should not hibernate +%% between those messages, but as the messages become less frequent, +%% the process will not only hibernate, it will do so sooner after +%% each message. +%% +%% When using this backoff mechanism, normal timeout values (i.e. not +%% 'hibernate') can still be used, and if they are used then the +%% handle_info(timeout, State) will be called as normal. In this case, +%% returning 'hibernate' from handle_info(timeout, State) will not +%% hibernate the process immediately, as it would if backoff wasn't +%% being used. Instead it'll wait for the current timeout as described +%% above. + +%% All modifications are (C) 2009-2010 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -50,6 +91,7 @@ %%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} +%%% {ok, State, Timeout, Backoff} %%% ignore %%% {stop, Reason} %%% @@ -81,6 +123,17 @@ %%% %%% ==> ok %%% +%%% handle_pre_hibernate(State) +%%% +%%% ==> {hibernate, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% handle_post_hibernate(State) +%%% +%%% ==> {noreply, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% The work flow (of the server) can be described as follows: %%% @@ -107,11 +160,11 @@ %% API -export([start/3, start/4, start_link/3, start_link/4, - call/2, call/3, - cast/2, reply/2, + call/2, call/3, pcall/3, pcall/4, + cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -126,6 +179,20 @@ -import(error_logger, [format/2]). +%%%========================================================================= +%%% Specs. These exist only to shut up dialyzer's warnings +%%%========================================================================= + +-ifdef(use_specs). + +-spec(handle_common_termination/6 :: + (any(), any(), any(), atom(), any(), any()) -> no_return()). + +-spec(hibernate/7 :: + (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). + +-endif. + %%%========================================================================= %%% API %%%========================================================================= @@ -188,6 +255,22 @@ call(Name, Request, Timeout) -> exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. +pcall(Name, Priority, Request) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) + end. + +pcall(Name, Priority, Request, Timeout) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + end. + %% ----------------------------------------------------------------- %% Make a cast to a generic server. %% ----------------------------------------------------------------- @@ -207,6 +290,22 @@ do_cast(Dest, Request) -> cast_msg(Request) -> {'$gen_cast',Request}. +pcast({global,Name}, Priority, Request) -> + catch global:send(Name, cast_msg(Priority, Request)), + ok; +pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_atom(Dest) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_pid(Dest) -> + do_cast(Dest, Priority, Request). + +do_cast(Dest, Priority, Request) -> + do_send(Dest, cast_msg(Priority, Request)), + ok. + +cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. + %% ----------------------------------------------------------------- %% Send a reply to the client. %% ----------------------------------------------------------------- @@ -253,7 +352,7 @@ multi_call(Nodes, Name, Req, Timeout) %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, , ) ->_ +%% enter_loop(Mod, Options, State, , , ) ->_ %% %% Description: Makes an existing process into a gen_server. %% The calling process will enter the gen_server receive @@ -264,20 +363,30 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). + enter_loop(Mod, Options, State, self(), infinity, undefined). + +enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); + enter_loop(Mod, Options, State, ServerName, infinity, undefined); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). + enter_loop(Mod, Options, State, self(), Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> + enter_loop(Mod, Options, State, ServerName, Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), - Queue = queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + Queue = priority_queue:new(), + Backoff1 = extend_backoff(Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -292,23 +401,37 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -> + unregister_name(Name0), proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -> + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -> @@ -317,46 +440,188 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +%% name(Pid) when is_pid(Pid) -> Pid; +%% when R11 goes away, drop the line beneath and uncomment the line above +name(Name) -> Name. + +unregister_name({local,Name}) -> + _ = (catch unregister(Name)); +unregister_name({global,Name}) -> + _ = global:unregister_name(Name); +unregister_name(Pid) when is_pid(Pid) -> + Pid; +% Under R12 let's just ignore it, as we have a single term as Name. +% On R13 it will never get here, as we get tuple with 'local/global' atom. +unregister_name(_Name) -> ok. + +extend_backoff(undefined) -> + undefined; +extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, + drain(Queue), Debug). + +drain(Queue) -> receive - Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue), Debug) - after 0 -> - case queue:out(Queue) of - {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) + Input -> drain(in(Input, Queue)) + after 0 -> Queue + end. + +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, TimeoutState, Queue1, Debug, Msg); + {empty, Queue1} -> + {Time1, HibOnTimeout} + = case {Time, TimeoutState} of + {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> + {Current, true}; + {hibernate, _} -> + %% wake_hib/7 will set Time to hibernate. If + %% we were woken and didn't receive a msg + %% then we will get here and need a sensible + %% value for Time1, otherwise we crash. + %% R13B1 always waits infinitely when waking + %% from hibernation, so that's what we do + %% here too. + {infinity, false}; + _ -> {Time, false} + end, + receive + Input -> + %% Time could be 'hibernate' here, so *don't* call loop + process_next_msg( + Parent, Name, State, Mod, Time, TimeoutState, + drain(in(Input, Queue1)), Debug) + after Time1 -> + case HibOnTimeout of + true -> + pre_hibernate( + Parent, Name, State, Mod, TimeoutState, Queue1, + Debug); + false -> + process_msg( + Parent, Name, State, Mod, Time, TimeoutState, + Queue1, Debug, timeout) end end end. - -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> + +wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> + TimeoutState1 = case TS of + undefined -> + undefined; + {SleptAt, TimeoutState} -> + adjust_timeout_state(SleptAt, now(), TimeoutState) + end, + post_hibernate(Parent, Name, State, Mod, TimeoutState1, + drain(Queue), Debug). + +hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + TS = case TimeoutState of + undefined -> undefined; + {backoff, _, _, _, _} -> {now(), TimeoutState} + end, + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + TS, Queue, Debug]). + +pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_pre_hibernate, 1) of + true -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, + Debug); + Reply -> + handle_common_termination(Reply, Name, pre_hibernate, + Mod, State, Debug) + end; + false -> + hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + end. + +post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_post_hibernate, 1) of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + process_next_msg(Parent, Name, NState, Mod, infinity, + TimeoutState, Queue, Debug); + {noreply, NState, Time} -> + process_next_msg(Parent, Name, NState, Mod, Time, + TimeoutState, Queue, Debug); + Reply -> + handle_common_termination(Reply, Name, post_hibernate, + Mod, State, Debug) + end; + false -> + %% use hibernate here, not infinity. This matches + %% R13B. The key is that we should be able to get through + %% to process_msg calling sys:handle_system_msg with Time + %% still set to hibernate, iff that msg is the very msg + %% that woke us up (or the first msg we receive after + %% waking up). + process_next_msg(Parent, Name, State, Mod, hibernate, + TimeoutState, Queue, Debug) + end. + +adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, + DesiredHibPeriod, RandomState}) -> + NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + CurrentMicros = CurrentTO * 1000, + MinimumMicros = MinimumTO * 1000, + DesiredHibMicros = DesiredHibPeriod * 1000, + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + {Extra, RandomState1} = random:uniform_s(Base, RandomState), + CurrentTO1 = Base + Extra, + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. + +in({'$gen_pcast', {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_cast', Msg}, Priority, Queue); +in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); +in(Input, Queue) -> + priority_queue:in(Input, Queue). + +process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, + Debug, Msg) -> case Msg of {system, From, Req} -> - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); + sys:handle_system_msg( + Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, TimeoutState, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, + Debug1) end. %%% --------------------------------------------------- @@ -554,87 +819,95 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); - Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) + Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), reply(Name, From, Reply, NState, Debug), exit(R); Other -> - handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) + handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, []); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, + Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, Debug) + end. + +handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> + case Reply of {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -652,16 +925,24 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). + +-ifdef(use_specs). +-spec system_terminate(_, _, _, [_]) -> no_return(). +-endif. -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, + _TimeoutState, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, + OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; - Else -> Else + {ok, NewState} -> + {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; + Else -> + Else end. %%----------------------------------------------------------------- @@ -703,6 +984,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> exit(normal); shutdown -> exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); _ -> error_info(Reason, Name, Msg, State, Debug), exit(Reason) @@ -827,8 +1110,8 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = - StatusData, + [PDict, SysState, Parent, Debug, + [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); is_atom(Name) -> @@ -850,5 +1133,5 @@ format_status(Opt, StatusData) -> {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, - {"Queued messages", queue:to_list(Queue)}]} | + {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/local_pg2.erl b/src/local_pg2.erl deleted file mode 100644 index 48b8b72..0000000 --- a/src/local_pg2.erl +++ /dev/null @@ -1,153 +0,0 @@ --module(local_pg2). - -%% Basically the same functionality than pg2, but process groups are local rather than global. --export([create/1, delete/1, join/2, leave/2, get_members/1, get_closest_pid/1, which_groups/0]). - --export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - --define(TABLE, local_pg2_table). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -start() -> - ensure_started(). - -create(Name) -> - ensure_started(), - case ets:lookup(?TABLE, Name) of - [] -> - gen_server:call(?MODULE, {create, Name}); - _ -> - ok - end. -delete(Name) -> - ensure_started(), - gen_server:call(?MODULE, {delete, Name}). - -join(Name, Pid) when is_pid(Pid) -> - ensure_started(), - case ets:lookup(?TABLE, Name) of - [] -> - {error, {no_such_group, Name}}; - _ -> - gen_server:call(?MODULE, {join, Name, Pid}) - end. -leave(Name, Pid) when is_pid(Pid) -> - ensure_started(), - case ets:lookup(?TABLE, Name) of - [] -> - {error, {no_such_group, Name}}; - _ -> - gen_server:call(?MODULE, {leave, Name, Pid}) - end. - -get_members(Name) -> - ensure_started(), - case ets:lookup(?TABLE, Name) of - [] -> {error, {no_such_group, Name}}; - [{Name, Members}] -> Members - end. -which_groups() -> - ensure_started(), - [K || {K, _Members} <- ets:tab2list(?TABLE)]. - - -get_closest_pid(Name) -> - ensure_started(), - case ets:lookup(?TABLE, Name) of - [] -> - {error, {no_process, Name}}; - [{Name, Members}] -> - %% TODO: we can get more inteligent, check queue size, reductions, etc. - %% http://lethain.com/entry/2009/sep/12/load-balancing-across-erlang-process-groups/ - {_, _, X} = erlang:now(), - lists:nth((X rem length(Members)) +1, Members) - end. - - -init([]) -> - process_flag(trap_exit, true), - ets:new(?TABLE, [set, protected, named_table]), - {ok, []}. -handle_call({create, Name}, _From, S) -> - case ets:lookup(?TABLE, Name) of - [] -> - ets:insert(?TABLE, {Name, []}); - _ -> - ok - end, - {reply, ok, S}; - -handle_call({join, Name, Pid}, _From, S) -> - case ets:lookup(?TABLE, Name) of - [] -> - {reply, no_such_group, S}; - [{Name, Members}] -> - ets:insert(?TABLE, {Name, [Pid | Members]}), - link(Pid), - %%TODO: add pid to linked ones on state.. - {reply, ok, S} - end; - -handle_call({leave, Name, Pid}, _From, S) -> - case ets:lookup(?TABLE, Name) of - [] -> - {reply, no_such_group, S}; - [{Name, Members}] -> - case lists:delete(Pid, Members) of - [] -> - ets:delete(?TABLE, Name); - NewMembers -> - ets:insert(?TABLE, {Name, NewMembers}) - end, - unlink(Pid), - {reply, ok, S} - end; - -handle_call({delete, Name}, _From, S) -> - ets:delete(?TABLE, Name), - {reply, ok, S}. - -handle_cast(_Cast, S) -> - {noreply, S}. - -handle_info({'EXIT', Pid, _} , S) -> - del_member(Pid), - {noreply, S}; -handle_info(_Info, S) -> - {noreply, S}. - -terminate(_Reason, _S) -> - ets:delete(?TABLE), - %%do not unlink, if this fails, dangling processes should be killed - ok. -%%%----------------------------------------------------------------- -%%% Internal functions -%%%----------------------------------------------------------------- -del_member(Pid) -> - L = ets:tab2list(?TABLE), - lists:foreach(fun({Name, Members}) -> - case lists:member(Pid, Members) of - true -> - case lists:delete(Pid, Members) of - [] -> - ets:delete(?TABLE, Name); - NewMembers -> - ets:insert(?TABLE, {Name, NewMembers}) - end; - false -> - ok - end - end, L). - -ensure_started() -> - case whereis(?MODULE) of - undefined -> - C = {local_pg2, {?MODULE, start_link, []}, permanent, - 1000, worker, [?MODULE]}, - supervisor:start_child(kernel_safe_sup, C); - Pg2Pid -> - {ok, Pg2Pid} - end. - diff --git a/src/merle.app.src b/src/merle.app.src new file mode 100644 index 0000000..36de123 --- /dev/null +++ b/src/merle.app.src @@ -0,0 +1,13 @@ +{application, merle, + [ + {description, "Erlang Memcached Client"}, + {vsn, "1"}, + {modules, []}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { merle_app, []}}, + {env, []} + ]}. diff --git a/src/merle.erl b/src/merle.erl index 83a19a7..0ba12ab 100644 --- a/src/merle.erl +++ b/src/merle.erl @@ -38,7 +38,7 @@ -author("Joe Williams "). -version("Version: 0.3"). --define(TIMEOUT, 5000). +-define(DEFAULT_TIMEOUT, 5000). -define(RANDOM_MAX, 65535). -define(DEFAULT_HOST, "localhost"). -define(DEFAULT_PORT, 11211). @@ -53,9 +53,16 @@ %% gen_server API -export([ - stats/1, stats/2, version/1, getkey/2, getkeys/2, delete/3, set/5, add/5, replace/3, - replace/5, cas/6, set/3, flushall/1, flushall/2, verbosity/2, add/3, - cas/4, getskey/2, connect/0, connect/2, delete/2, disconnect/1 + stats/1, stats/2, version/1, + getkey/3, getkeys/3, getskey/3, + delete/3, delete/4, + replace/4, replace/6, + set/6, set/4, set/5, + cas/5, cas/7, + add/4, add/6, + incr/4, decr/4, incr_counter/4, incr_counter/5, getcounter/3, + flushall/1, flushall/2, + verbosity/2, connect/0, connect/2, disconnect/1 ]). %% gen_server callbacks @@ -95,13 +102,13 @@ flushall(Ref, Delay) -> gen_server2:call(Ref, {flushall, {Delay}}). %% @doc retrieve value based off of key -getkey(Ref, Key) when is_atom(Key) -> - getkey(Ref, atom_to_list(Key)); -getkey(Ref, Key) -> - gen_server2:call(Ref, {getkey,{Key}}). - +getkey(Ref, Key, Timeout) when is_atom(Key) -> + getkey(Ref, atom_to_list(Key), Timeout); +getkey(Ref, Key, Timeout) -> + gen_server2:call(Ref, {getkey, {Key, Timeout}}). + %% @doc retrieve multiple values based on keys -getkeys(Ref, Keys) when is_list(Keys) -> +getkeys(Ref, Keys, Timeout) when is_list(Keys) -> StringKeys = lists:map(fun (A) when is_atom(A) -> atom_to_list(A); @@ -109,25 +116,31 @@ getkeys(Ref, Keys) when is_list(Keys) -> S end, Keys), - gen_server2:call(Ref, {getkeys,{join_by(StringKeys, " ")}}). + gen_server2:call(Ref, {getkeys,{join_by(StringKeys, " "), Timeout}}). +%% @doc used in conjunction with incr_counter to retrieve an integer value from cache +getcounter(Ref, Key, Timeout) -> + case getkey(Ref, Key, Timeout) of + {error, Error} -> {error, Error}; + {ok, NumberBin} -> {ok, list_to_integer(string:strip(binary_to_list(NumberBin)))} + end. %% @doc retrieve value based off of key for use with cas -getskey(Ref, Key) when is_atom(Key) -> - getskey(Ref, atom_to_list(Key)); -getskey(Ref, Key) -> - gen_server2:call(Ref, {getskey,{Key}}). +getskey(Ref, Key, Timeout) when is_atom(Key) -> + getskey(Ref, atom_to_list(Key), Timeout); +getskey(Ref, Key, Timeout) -> + gen_server2:call(Ref, {getskey,{Key, Timeout}}). %% @doc delete a key -delete(Ref, Key) -> - delete(Ref, Key, "0"). +delete(Ref, Key, Timeout) -> + delete(Ref, Key, "0", Timeout). -delete(Ref, Key, Time) when is_atom(Key) -> - delete(Ref, atom_to_list(Key), Time); -delete(Ref, Key, Time) when is_integer(Time) -> - delete(Ref, Key, integer_to_list(Time)); -delete(Ref, Key, Time) -> - gen_server2:call(Ref, {delete, {Key, Time}}). +delete(Ref, Key, Time, Timeout) when is_atom(Key) -> + delete(Ref, atom_to_list(Key), Time, Timeout); +delete(Ref, Key, Time, Timeout) when is_integer(Time) -> + delete(Ref, Key, integer_to_list(Time), Timeout); +delete(Ref, Key, Time, Timeout) -> + gen_server2:call(Ref, {delete, {Key, Time, Timeout}}). %% Time is the amount of time in seconds %% the client wishes the server to refuse @@ -152,62 +165,147 @@ delete(Ref, Key, Time) -> %% *Value* is the value you want to store. %% @doc Store a key/value pair. -set(Ref, Key, Value) -> +set(Ref, Key, Value, Timeout) -> + set(Ref, Key, "0", Value, Timeout). + +set(Ref, Key, ExpTime, Value, Timeout) -> Flag = random:uniform(?RANDOM_MAX), - set(Ref, Key, integer_to_list(Flag), "0", Value). + set(Ref, Key, integer_to_list(Flag), ExpTime, Value, Timeout). -set(Ref, Key, Flag, ExpTime, Value) when is_atom(Key) -> - set(Ref, atom_to_list(Key), Flag, ExpTime, Value); -set(Ref, Key, Flag, ExpTime, Value) when is_integer(Flag) -> - set(Ref, Key, integer_to_list(Flag), ExpTime, Value); -set(Ref, Key, Flag, ExpTime, Value) when is_integer(ExpTime) -> - set(Ref, Key, Flag, integer_to_list(ExpTime), Value); -set(Ref, Key, Flag, ExpTime, Value) -> - gen_server2:call(Ref, {set, {Key, Flag, ExpTime, Value}}). +set(Ref, Key, Flag, ExpTime, Value, Timeout) when is_atom(Key) -> + set(Ref, atom_to_list(Key), Flag, ExpTime, Value, Timeout); +set(Ref, Key, Flag, ExpTime, Value, Timeout) when is_integer(Flag) -> + set(Ref, Key, integer_to_list(Flag), ExpTime, Value, Timeout); +set(Ref, Key, Flag, ExpTime, Value, Timeout) when is_integer(ExpTime) -> + set(Ref, Key, Flag, integer_to_list(ExpTime), Value, Timeout); +set(Ref, Key, Flag, ExpTime, Value, Timeout) -> + gen_server2:call(Ref, {set, {Key, Flag, ExpTime, Value, Timeout}}). %% @doc Store a key/value pair if it doesn't already exist. -add(Ref, Key, Value) -> +add(Ref, Key, Value, Timeout) -> Flag = random:uniform(?RANDOM_MAX), - add(Ref, Key, integer_to_list(Flag), "0", Value). + add(Ref, Key, integer_to_list(Flag), "0", Value, Timeout). -add(Ref, Key, Flag, ExpTime, Value) when is_atom(Key) -> - add(Ref, atom_to_list(Key), Flag, ExpTime, Value); -add(Ref, Key, Flag, ExpTime, Value) when is_integer(Flag) -> - add(Ref, Key, integer_to_list(Flag), ExpTime, Value); -add(Ref, Key, Flag, ExpTime, Value) when is_integer(ExpTime) -> - add(Ref, Key, Flag, integer_to_list(ExpTime), Value); -add(Ref, Key, Flag, ExpTime, Value) -> - gen_server2:call(Ref, {add, {Key, Flag, ExpTime, Value}}). +add(Ref, Key, Flag, ExpTime, Value, Timeout) when is_atom(Key) -> + add(Ref, atom_to_list(Key), Flag, ExpTime, Value, Timeout); +add(Ref, Key, Flag, ExpTime, Value, Timeout) when is_integer(Flag) -> + add(Ref, Key, integer_to_list(Flag), ExpTime, Value, Timeout); +add(Ref, Key, Flag, ExpTime, Value, Timeout) when is_integer(ExpTime) -> + add(Ref, Key, Flag, integer_to_list(ExpTime), Value, Timeout); +add(Ref, Key, Flag, ExpTime, Value, Timeout) -> + gen_server2:call(Ref, {add, {Key, Flag, ExpTime, Value, Timeout}}). %% @doc Replace an existing key/value pair. -replace(Ref, Key, Value) -> +replace(Ref, Key, Value, Timeout) -> Flag = random:uniform(?RANDOM_MAX), - replace(Ref, Key, integer_to_list(Flag), "0", Value). + replace(Ref, Key, integer_to_list(Flag), "0", Value, Timeout). -replace(Ref, Key, Flag, ExpTime, Value) when is_atom(Key) -> - replace(Ref, atom_to_list(Key), Flag, ExpTime, Value); -replace(Ref, Key, Flag, ExpTime, Value) when is_integer(Flag) -> - replace(Ref, Key, integer_to_list(Flag), ExpTime, Value); -replace(Ref, Key, Flag, ExpTime, Value) when is_integer(ExpTime) -> - replace(Ref, Key, Flag, integer_to_list(ExpTime), Value); -replace(Ref, Key, Flag, ExpTime, Value) -> - gen_server2:call(Ref, {replace, {Key, Flag, ExpTime, Value}}). +replace(Ref, Key, Flag, ExpTime, Value, Timeout) when is_atom(Key) -> + replace(Ref, atom_to_list(Key), Flag, ExpTime, Value, Timeout); +replace(Ref, Key, Flag, ExpTime, Value, Timeout) when is_integer(Flag) -> + replace(Ref, Key, integer_to_list(Flag), ExpTime, Value, Timeout); +replace(Ref, Key, Flag, ExpTime, Value, Timeout) when is_integer(ExpTime) -> + replace(Ref, Key, Flag, integer_to_list(ExpTime), Value, Timeout); +replace(Ref, Key, Flag, ExpTime, Value, Timeout) -> + gen_server2:call(Ref, {replace, {Key, Flag, ExpTime, Value, Timeout}}). %% @doc Store a key/value pair if possible. -cas(Ref, Key, CasUniq, Value) -> +cas(Ref, Key, CasUniq, Value, Timeout) -> Flag = random:uniform(?RANDOM_MAX), - cas(Ref, Key, integer_to_list(Flag), "0", CasUniq, Value). - -cas(Ref, Key, Flag, ExpTime, CasUniq, Value) when is_atom(Key) -> - cas(Ref, atom_to_list(Key), Flag, ExpTime, CasUniq, Value); -cas(Ref, Key, Flag, ExpTime, CasUniq, Value) when is_integer(Flag) -> - cas(Ref, Key, integer_to_list(Flag), ExpTime, CasUniq, Value); -cas(Ref, Key, Flag, ExpTime, CasUniq, Value) when is_integer(ExpTime) -> - cas(Ref, Key, Flag, integer_to_list(ExpTime), CasUniq, Value); -cas(Ref, Key, Flag, ExpTime, CasUniq, Value) when is_integer(CasUniq) -> - cas(Ref, Key, Flag, ExpTime, integer_to_list(CasUniq), Value); -cas(Ref, Key, Flag, ExpTime, CasUniq, Value) -> - gen_server2:call(Ref, {cas, {Key, Flag, ExpTime, CasUniq, Value}}). + cas(Ref, Key, integer_to_list(Flag), "0", CasUniq, Value, Timeout). + +cas(Ref, Key, Flag, ExpTime, CasUniq, Value, Timeout) when is_atom(Key) -> + cas(Ref, atom_to_list(Key), Flag, ExpTime, CasUniq, Value, Timeout); +cas(Ref, Key, Flag, ExpTime, CasUniq, Value, Timeout) when is_integer(Flag) -> + cas(Ref, Key, integer_to_list(Flag), ExpTime, CasUniq, Value, Timeout); +cas(Ref, Key, Flag, ExpTime, CasUniq, Value, Timeout) when is_integer(ExpTime) -> + cas(Ref, Key, Flag, integer_to_list(ExpTime), CasUniq, Value, Timeout); +cas(Ref, Key, Flag, ExpTime, CasUniq, Value, Timeout) when is_integer(CasUniq) -> + cas(Ref, Key, Flag, ExpTime, integer_to_list(CasUniq), Value, Timeout); +cas(Ref, Key, Flag, ExpTime, CasUniq, Value, Timeout) -> + gen_server2:call(Ref, {cas, {Key, Flag, ExpTime, CasUniq, Value, Timeout}}). + +%% @doc Add a key to memcached which can be used as a counter via incr/decr. +%% Currently, incr_counter/2 checks via incr to see if a counter exists before +%% creating it. This is a subsitute for using a cas operation to initialize the +%% counter. +%% +%% To this effect, incr_counter/2 uses a new clause merle:handle_call/2 which +%% sends +%% +%% ``` +%% set 0 4\r\n0000\r\n''' +%% +%% to memcached via send_storage_cmd/2 (which adds the terminal CRLF and +%% generates the Flag parameter), thus creating a key with name Key with 64-bits +%% of space allocated for an integer value. FFFFFFFF is a negative value, not +%% acceptable by memcached, and so is coerced to zero. Hey presto! A counter. +%% + +-define(MAX_INCR_TRIES, 2). + +incr_counter(Ref, Key, Value, Timeout) -> + incr_counter(Ref, Key, Value, "0", Timeout). + +incr_counter(Ref, Key, Value, ExpTime, Timeout) when is_integer(ExpTime) -> + incr_counter(Ref, Key, Value, integer_to_list(ExpTime), Timeout); + +incr_counter(Ref, Key, Value, ExpTime, Timeout) -> + incr_counter(Ref, Key, Value, ExpTime, Timeout, 0). + +incr_counter(_Ref, _Key, _Value, _ExpTime, _Timeout, ?MAX_INCR_TRIES) -> + {error, not_stored}; +incr_counter(Ref, Key, Value, ExpTime, Timeout, NumTry) -> + Flag = random:uniform(?RANDOM_MAX), + case incr(Ref, Key, Value, Timeout) of + not_found -> + case gen_server2:call(Ref, {addcounter, {Key, integer_to_list(Flag), ExpTime, Timeout}}) of + {ok, stored} -> + incr(Ref, Key, Value, Timeout); + {error, _} -> + incr_counter(Ref, Key, Value, ExpTime, Timeout, NumTry+1); + X -> X + end; + {error, _} -> + incr_counter(Ref, Key, Value, ExpTime, Timeout, NumTry+1); + Result -> Result + end. + + +%% @doc Interface to the incr method in memcached's protocol. +%% +%% incr/2 and decr/2 both use erlang:interger_to_list/1 to convert their integer +%% arguments into a decimal string representation, which is then submitted to +%% memcached. +%% +%% @spec incr(Key::list(),Value::integer()) -> (not_found | {ok,NewValue::integer()}) +%% @see merle:decr/2 +incr(Ref, Key, Value, Timeout) when is_integer(Value) -> + case gen_server2:call(Ref, {incr, {Key, integer_to_list(Value), Timeout}}) of + {error, not_found} -> not_found; + {error, Error} -> {error, Error}; + Line -> + {ok, [IntegerString], []} = io_lib:fread("~s\r\n", binary_to_list(Line)), + {ok, list_to_integer(IntegerString)} + end. + +%% @doc Interface to the decr method in memcached's protocol. +%% +%% incr/2 and decr/2 both use erlang:interger_to_list/1 to convert their integer +%% arguments into a decimal string representation, which is then submitted to +%% memcached. +%% +%% Since incr and decr in memcached are defined to operate on the binary +%% representations of 64-bit unsigned integers, it is not possible to decrement +%% a value in memcached to below zero. +%% +%% @spec decr(Key::list(),Value::integer()) -> (not_found | {ok,NewValue::integer()}) +%% @see merle:incr/2 +decr(Ref, Key, Value, Timeout) when is_integer(Value) -> + case gen_server2:call(Ref, {decr, {Key, integer_to_list(Value), Timeout}}) of + {error, not_found} -> not_found; + [Str] -> {ok, list_to_integer(Str)} + end. %% @doc connect to memcached with defaults connect() -> @@ -228,54 +326,56 @@ start_link(Host, Port) -> %% @private init([Host, Port]) -> - gen_tcp:connect(Host, Port, ?TCP_OPTS_ACTIVE). + log4erl:info("Socket initialized!"), + + erlang:process_flag(trap_exit, true), -handle_call({stop}, _From, Socket) -> - {stop, requested_disconnect, Socket}; + gen_tcp:connect(Host, Port, ?TCP_OPTS_ACTIVE). handle_call({stats}, _From, Socket) -> - Reply = send_stats_cmd(Socket, iolist_to_binary([<<"stats">>])), + Reply = send_stats_cmd(Socket, iolist_to_binary([<<"stats">>]), ?DEFAULT_TIMEOUT), {reply, Reply, Socket}; handle_call({stats, {Args}}, _From, Socket) -> - Reply = send_stats_cmd(Socket, iolist_to_binary([<<"stats ">>, Args])), + Reply = send_stats_cmd(Socket, iolist_to_binary([<<"stats ">>, Args]), ?DEFAULT_TIMEOUT), {reply, Reply, Socket}; handle_call({version}, _From, Socket) -> - Reply = send_generic_cmd(Socket, iolist_to_binary([<<"version">>])), + Reply = send_generic_cmd(Socket, iolist_to_binary([<<"version">>]), ?DEFAULT_TIMEOUT), {reply, Reply, Socket}; handle_call({verbosity, {Args}}, _From, Socket) -> - Reply = send_generic_cmd(Socket, iolist_to_binary([<<"verbosity ">>, Args])), + Reply = send_generic_cmd(Socket, iolist_to_binary([<<"verbosity ">>, Args]), ?DEFAULT_TIMEOUT), {reply, Reply, Socket}; handle_call({flushall}, _From, Socket) -> - Reply = send_generic_cmd(Socket, iolist_to_binary([<<"flush_all">>])), + Reply = send_generic_cmd(Socket, iolist_to_binary([<<"flush_all">>]), ?DEFAULT_TIMEOUT), {reply, Reply, Socket}; handle_call({flushall, {Delay}}, _From, Socket) -> - Reply = send_generic_cmd(Socket, iolist_to_binary([<<"flush_all ">>, Delay])), + Reply = send_generic_cmd(Socket, iolist_to_binary([<<"flush_all ">>, Delay]), ?DEFAULT_TIMEOUT), {reply, Reply, Socket}; -handle_call({getkey, {Key}}, _From, Socket) -> - Reply = send_get_cmd(Socket, iolist_to_binary([<<"get ">>, Key])), +handle_call({getkey, {Key, Timeout}}, _From, Socket) -> + Reply = send_get_cmd(Socket, iolist_to_binary([<<"get ">>, Key]), Timeout), {reply, Reply, Socket}; -handle_call({getkeys, {Keys}}, _From, Socket) -> - Reply = send_multi_get_cmd(Socket, iolist_to_binary([<<"get ">>, Keys])), +handle_call({getkeys, {Keys, Timeout}}, _From, Socket) -> + Reply = send_multi_get_cmd(Socket, iolist_to_binary([<<"get ">>, Keys]), Timeout), {reply, Reply, Socket}; -handle_call({getskey, {Key}}, _From, Socket) -> - Reply = send_gets_cmd(Socket, iolist_to_binary([<<"gets ">>, Key])), +handle_call({getskey, {Key, Timeout}}, _From, Socket) -> + Reply = send_gets_cmd(Socket, iolist_to_binary([<<"gets ">>, Key]), Timeout), {reply, [Reply], Socket}; -handle_call({delete, {Key, Time}}, _From, Socket) -> +handle_call({delete, {Key, Time, Timeout}}, _From, Socket) -> Reply = send_generic_cmd( Socket, - iolist_to_binary([<<"delete ">>, Key, <<" ">>, Time]) + iolist_to_binary([<<"delete ">>, Key, <<" ">>, Time]), + Timeout ), {reply, Reply, Socket}; -handle_call({set, {Key, Flag, ExpTime, Value}}, _From, Socket) -> +handle_call({set, {Key, Flag, ExpTime, Value, Timeout}}, _From, Socket) -> Bin = term_to_binary(Value), Bytes = integer_to_list(size(Bin)), Reply = send_storage_cmd( @@ -283,11 +383,28 @@ handle_call({set, {Key, Flag, ExpTime, Value}}, _From, Socket) -> iolist_to_binary([ <<"set ">>, Key, <<" ">>, Flag, <<" ">>, ExpTime, <<" ">>, Bytes ]), - Bin + Bin, + Timeout ), {reply, Reply, Socket}; -handle_call({add, {Key, Flag, ExpTime, Value}}, _From, Socket) -> +%% special clause to add a counter to memcached instead of serialized +%% erlang data (literal 0xFFFFFFFF instead of the erlang bitstring +%% <<131,98,255,255,255,255,255,255,255,255,0>>) +handle_call({addcounter, {Key, Flag, ExpTime, Timeout}}, _From, Socket) -> + Bin = <<"0000">>, + Bytes = <<"4">>, + Reply = send_storage_cmd( + Socket, + iolist_to_binary([ + <<"set ">>, Key, <<" ">>, Flag, <<" ">>, ExpTime, <<" ">>, Bytes + ]), + Bin, + Timeout + ), + {reply, Reply, Socket}; + +handle_call({add, {Key, Flag, ExpTime, Value, Timeout}}, _From, Socket) -> Bin = term_to_binary(Value), Bytes = integer_to_list(size(Bin)), Reply = send_storage_cmd( @@ -295,11 +412,12 @@ handle_call({add, {Key, Flag, ExpTime, Value}}, _From, Socket) -> iolist_to_binary([ <<"add ">>, Key, <<" ">>, Flag, <<" ">>, ExpTime, <<" ">>, Bytes ]), - Bin + Bin, + Timeout ), {reply, Reply, Socket}; -handle_call({replace, {Key, Flag, ExpTime, Value}}, _From, Socket) -> +handle_call({replace, {Key, Flag, ExpTime, Value, Timeout}}, _From, Socket) -> Bin = term_to_binary(Value), Bytes = integer_to_list(size(Bin)), Reply = send_storage_cmd( @@ -308,11 +426,12 @@ handle_call({replace, {Key, Flag, ExpTime, Value}}, _From, Socket) -> <<"replace ">>, Key, <<" ">>, Flag, <<" ">>, ExpTime, <<" ">>, Bytes ]), - Bin + Bin, + Timeout ), {reply, Reply, Socket}; -handle_call({cas, {Key, Flag, ExpTime, CasUniq, Value}}, _From, Socket) -> +handle_call({cas, {Key, Flag, ExpTime, CasUniq, Value, Timeout}}, _From, Socket) -> Bin = term_to_binary(Value), Bytes = integer_to_list(size(Bin)), Reply = send_storage_cmd( @@ -321,11 +440,26 @@ handle_call({cas, {Key, Flag, ExpTime, CasUniq, Value}}, _From, Socket) -> <<"cas ">>, Key, <<" ">>, Flag, <<" ">>, ExpTime, <<" ">>, Bytes, <<" ">>, CasUniq ]), - Bin + Bin, + Timeout ), + {reply, Reply, Socket}; + +%% Added by Jeremy D. Acord, March 2012 +handle_call({incr, {Key, Value, Timeout}}, _From, Socket) when is_list(Value) -> + CMD = iolist_to_binary([<<"incr ">>,Key,<<" ">>,Value]), + Reply = send_generic_cmd(Socket, CMD, Timeout), + {reply, Reply, Socket}; + +handle_call({decr, {Key, Value, Timeout}}, _From, Socket) when is_list(Value) -> + CMD = iolist_to_binary([<<"decr ">>,Key,<<" ">>,Value]), + Reply = send_generic_cmd(Socket, CMD, Timeout), {reply, Reply, Socket}. %% @private +handle_cast(stop, State) -> + {stop, normal, State}; + handle_cast(_Msg, State) -> {noreply, State}. %% @private @@ -333,6 +467,11 @@ handle_info({tcp_closed, Socket}, Socket) -> {stop, {error, tcp_closed}, Socket}; handle_info({tcp_error, Socket, Reason}, Socket) -> {stop, {error, {tcp_error, Reason}}, Socket}; + +handle_info({'EXIT', _, Reason}, Socket) -> + log4erl:warn("Exiting merle connection ~p", [Reason]), + {stop, normal, Socket}; + handle_info(_Info, State) -> {noreply, State}. %% @private @@ -341,48 +480,52 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %% @private %% @doc Closes the socket terminate(_Reason, Socket) -> + log4erl:info("Socket terminated!"), gen_tcp:close(Socket), ok. %% @private %% @doc send_stats_cmd/2 function for stats get -send_stats_cmd(Socket, Cmd) -> +send_stats_cmd(Socket, Cmd, Timeout) -> gen_tcp:send(Socket, <>), - Reply = recv_stats(), + Reply = recv_stats(Timeout), Reply. %% @private %% @doc send_generic_cmd/2 function for simple informational and deletion commands -send_generic_cmd(Socket, Cmd) -> +send_generic_cmd(Socket, Cmd, Timeout) -> gen_tcp:send(Socket, <>), - Reply = recv_simple_reply(), + Reply = recv_simple_reply(Timeout), Reply. %% @private %% @doc send_storage_cmd/3 funtion for storage commands -send_storage_cmd(Socket, Cmd, Value) -> +send_storage_cmd(Socket, Cmd, Value, Timeout) -> gen_tcp:send(Socket, <>), gen_tcp:send(Socket, <>), - Reply = recv_simple_reply(), + Reply = recv_simple_reply(Timeout), Reply. %% @private %% @doc send_get_cmd/2 function for retreival commands -send_get_cmd(Socket, Cmd) -> +send_get_cmd(Socket, Cmd, Timeout) -> inet:setopts(Socket, ?TCP_OPTS_LINE), gen_tcp:send(Socket, <>), - Reply = case recv_complex_get_reply(Socket) of + Reply = case recv_complex_get_reply(Socket, Timeout) of [{_, Value}] -> {ok, Value}; [] -> {error, not_found}; - {error, Error} -> {error, Error} - end, + {error, Error} -> + log4erl:warn("Encountered error from memcache; killing connection now: ~p", [Error]), + erlang:exit(self(), Error), + {error, Error} + end, inet:setopts(Socket, ?TCP_OPTS_ACTIVE), Reply. -send_multi_get_cmd(Socket, Cmd) -> +send_multi_get_cmd(Socket, Cmd, Timeout) -> inet:setopts(Socket, ?TCP_OPTS_LINE), gen_tcp:send(Socket, <>), - Reply = case recv_complex_get_reply(Socket) of + Reply = case recv_complex_get_reply(Socket, Timeout) of {error, Error} -> {error, Error}; R -> {ok, R} end, @@ -391,21 +534,21 @@ send_multi_get_cmd(Socket, Cmd) -> %% @private %% @doc send_gets_cmd/2 function for cas retreival commands -send_gets_cmd(Socket, Cmd) -> +send_gets_cmd(Socket, Cmd, Timeout) -> gen_tcp:send(Socket, <>), - Reply = recv_complex_gets_reply(Socket), + Reply = recv_complex_gets_reply(Socket, Timeout), Reply. %% @private %% {active, once} is overkill here, but don't worry to much on optimize this method -recv_stats() -> - case do_recv_stats() of +recv_stats(Timeout) -> + case do_recv_stats(Timeout) of timeout -> {error, timeout}; Stats -> {ok, Stats} end. -do_recv_stats() -> +do_recv_stats(Timeout) -> receive {tcp, Socket, <<"END\r\n">>} -> inet:setopts(Socket, ?TCP_OPTS_ACTIVE), @@ -413,61 +556,72 @@ do_recv_stats() -> {tcp, Socket, Data} -> {ok, [Field, Value], []} = io_lib:fread("STAT ~s ~s \r\n", binary_to_list(Data)), inet:setopts(Socket, ?TCP_OPTS_ACTIVE), - [{Field, Value} | do_recv_stats()] - after ?TIMEOUT -> - timeout + [{Field, Value} | do_recv_stats(Timeout)] + after Timeout -> + timeout end. %% @doc receive function for simple responses (not containing VALUEs) -recv_simple_reply() -> +recv_simple_reply(Timeout) -> receive {tcp, Socket, Data} -> inet:setopts(Socket, ?TCP_OPTS_ACTIVE), parse_simple_response_line(Data); {error, closed} -> + log4erl:warn("Encountered error while receiving simple reply from memcache; killing connection now."), + erlang:exit(self(), connection_closed), connection_closed - after ?TIMEOUT -> {error, timeout} + after Timeout -> + log4erl:warn("Encountered timeout while receiving simple reply from memcache; killing connection now."), + erlang:exit(self(), timeout), + {error, timeout} end. parse_simple_response_line(<<"OK", _B/binary>>) -> ok; parse_simple_response_line(<<"ERROR", _B/binary>> =L ) -> {error, L}; parse_simple_response_line(<<"CLIENT_ERROR", _B/binary>> =L ) -> {error, L}; parse_simple_response_line(<<"SERVER_ERROR", _B/binary>> =L) -> {error, L}; -parse_simple_response_line(<<"STORED", _B/binary>>) -> ok; -parse_simple_response_line(<<"NOT_STORED", _B/binary>> ) -> ok; +parse_simple_response_line(<<"STORED", _B/binary>>) -> {ok, stored}; +parse_simple_response_line(<<"NOT_STORED", _B/binary>> ) -> {error, not_stored}; parse_simple_response_line(<<"EXISTS", _B/binary>> ) -> {error, exists}; parse_simple_response_line(<<"NOT_FOUND", _B/binary>> ) -> {error, not_found}; parse_simple_response_line(<<"DELETED", _B/binary>> ) -> ok; parse_simple_response_line(<<"VERSION", _B/binary>> =L) -> {ok, L}; -parse_simple_response_line(Line) -> {error, {unknown_response, Line}}. +parse_simple_response_line(Line) -> Line. %% @private %% @doc receive function for respones containing VALUEs -recv_complex_get_reply(Socket) -> - recv_complex_get_reply(Socket, []). -recv_complex_get_reply(Socket, Accum) -> - case gen_tcp:recv(Socket, 0, ?TIMEOUT) of +recv_complex_get_reply(Socket, Timeout) -> + recv_complex_get_reply(Socket, Timeout, []). +recv_complex_get_reply(Socket, Timeout, Accum) -> + case gen_tcp:recv(Socket, 0, Timeout) of {ok, <<"END\r\n">>} -> Accum; {ok, Data} -> - {ok,[_,Key,_,Bytes], []} = - io_lib:fread("~s ~s ~u ~u\r\n", binary_to_list(Data)), - inet:setopts(Socket, ?TCP_OPTS_RAW), - case gen_tcp:recv(Socket, Bytes+2, ?TIMEOUT) of + {ok,[_,Key,_,Bytes], []} = io_lib:fread("~s ~s ~u ~u\r\n", binary_to_list(Data)), + inet:setopts(Socket, ?TCP_OPTS_RAW), + case gen_tcp:recv(Socket, Bytes+2, Timeout) of {ok, <>} -> inet:setopts(Socket, ?TCP_OPTS_LINE), - recv_complex_get_reply(Socket, - [{Key, binary_to_term(Value)}|Accum]); + + FinalValue = + try binary_to_term(Value) + catch + _:_ -> Value + end, + + recv_complex_get_reply(Socket, Timeout, [{Key, FinalValue}|Accum]); + {error, Error} -> {error, Error} end; {error, Error} -> {error, Error} end. - + %% @private %% @doc receive function for cas responses containing VALUEs -recv_complex_gets_reply(Socket) -> +recv_complex_gets_reply(Socket, Timeout) -> receive %% For receiving get responses where the key does not exist {tcp, Socket, <<"END\r\n">>} -> @@ -478,18 +632,18 @@ recv_complex_gets_reply(Socket) -> %% Reply format <<"VALUE SOMEKEY FLAG BYTES\r\nSOMEVALUE\r\nEND\r\n">> Parse = io_lib:fread("~s ~s ~u ~u ~u\r\n", binary_to_list(Data)), {ok,[_,_,_,Bytes,CasUniq], []} = Parse, - Reply = get_data(Socket, Bytes), + Reply = get_data(Socket, Timeout, Bytes), {ok, [CasUniq, Reply]}; {error, closed} -> {error, connection_closed} - after ?TIMEOUT -> {error, timeout} + after Timeout -> {error, timeout} end. %% @private %% @doc recieve loop to get all data -get_data(Socket, Bytes) -> +get_data(Socket, Timeout, Bytes) -> inet:setopts(Socket, ?TCP_OPTS_RAW), - {ok, Data} = gen_tcp:recv(Socket, Bytes+7, ?TIMEOUT), + {ok, Data} = gen_tcp:recv(Socket, Bytes+7, Timeout), <> = Data, inet:setopts(Socket, ?TCP_OPTS_ACTIVE), binary_to_term(Value). @@ -501,4 +655,3 @@ join_by([A|[]], _) -> [A]; join_by([A|Rest], J) -> [A, J | join_by(Rest, J)]. - diff --git a/src/merle_client.erl b/src/merle_client.erl new file mode 100644 index 0000000..6dbcd6f --- /dev/null +++ b/src/merle_client.erl @@ -0,0 +1,207 @@ +-module(merle_client). + +-export([start_link/1, init/1, handle_call/3, handle_info/2, handle_cast/2, terminate/2]). + +-export([checkout/3, checkin/1, get_checkout_state/1, get_socket/1]). + +-define(RESTART_INTERVAL, 5000). %% retry each 5 seconds. + +-record(state, { + host, + port, + + socket, % memcached connection socket + + monitor, % represents a monitor bw checking out process and me + + checked_out, % boolean indicating whether this connection is checked out or not + check_out_time % timestamp marking when this connection was checked out +}). + + +start_link([Host, Port]) -> + gen_server:start_link(?MODULE, [Host, Port], []). + + +init([Host, Port]) -> + log4erl:info("Merle watcher initialized!"), + erlang:process_flag(trap_exit, true), + + merle_pool:create({Host, Port}), + merle_pool:join({Host, Port}, self()), + + { + ok, + check_in_state( + #state{ + host = Host, + port = Port + } + ) + }. + + +%% +%% API +%% + + +checkout(Pid, BorrowerPid, CheckoutTime) -> + gen_server:call(Pid, {checkout, BorrowerPid, CheckoutTime}). + + +checkin(Pid) -> + gen_server:call(Pid, checkin). + + +get_checkout_state(Pid) -> + gen_server:call(Pid, get_checkout_state). + + +get_socket(Pid) -> + gen_server:call(Pid, get_socket). + + +%% +%% SERVER CALL HANDLERS +%% + + +%% +%% Handle checkout events. Mark this server as used, and note the time. +%% Bind a monitor with the checking out process. +%% +handle_call({checkout, _, _}, _From, State = #state{checked_out = true}) -> + {reply, busy, State}; +handle_call({checkout, _, _}, _From, State = #state{socket = undefined}) -> + % NOTE: initializes socket when none found + {reply, no_socket, connect_socket(State)}; +handle_call({checkout, BorrowerPid, CheckoutTime}, _From, State = #state{socket = Socket, monitor = PrevMonitor}) -> + % handle any previously existing monitors + case PrevMonitor of + undefined -> + ok; + _ -> + true = erlang:demonitor(PrevMonitor) + end, + + Monitor = erlang:monitor(process, BorrowerPid), + + {reply, Socket, check_out_state(State#state{monitor = Monitor}, CheckoutTime)}; + + +%% +%% Handle checkin events. Demonitor perviously monitored process, and mark as checked in +%% +handle_call(checkin, _From, State = #state{monitor = PrevMonitor}) -> + case PrevMonitor of + undefined -> ok; + _ -> + true = erlang:demonitor(PrevMonitor) + end, + + {reply, ok, check_in_state(State#state{monitor = undefined})}; + + +%% +%% Returns checkout state for the client in question +%% +handle_call(get_checkout_state, _From, State = #state{checked_out = CheckedOut, check_out_time = CheckOutTime}) -> + {reply, {CheckedOut, CheckOutTime}, State}; + + +%% +%% Returns socket for the client in question +%% +handle_call(get_socket, _From, State = #state{socket = Socket}) -> + {reply, Socket, State}; + + +handle_call(_Call, _From, S) -> + {reply, ok, S}. + + +%% +%% Handles 'connect' messages -> initializes socket on host/port, saving a reference +%% +handle_info('connect', #state{host = Host, port = Port, checked_out = true, socket = undefined} = State) -> + case merle:connect(Host, Port) of + {ok, Socket} -> + + {noreply, check_in_state(State#state{socket = Socket})}; + + {error, Reason} -> + error_logger:error_report([memcached_connection_error, + {reason, Reason}, + {host, Host}, + {port, Port}, + {restarting_in, ?RESTART_INTERVAL}] + ), + + timer:send_after(?RESTART_INTERVAL, self(), 'connect'), + + {noreply, State} + end; + + +%% +%% Handles down events from monitored process. Need to check back in if this happens. +%% +handle_info({'DOWN', MonitorRef, _, _, _}, #state{monitor=MonitorRef} = S) -> + log4erl:info("merle_watcher caught a DOWN event"), + + true = erlang:demonitor(MonitorRef), + + {noreply, check_in_state(S#state{monitor = undefined})}; + + +%% +%% Handles exit events on the memcached socket. If this occurs need to reconnect. +%% +handle_info({'EXIT', Socket, _}, S = #state{socket = Socket}) -> + {noreply, connect_socket(S), ?RESTART_INTERVAL}; + + +handle_info(_Info, S) -> + error_logger:warning_report([{merle_watcher, self()}, {unknown_info, _Info}]), + {noreply, S}. + + +handle_cast(_Cast, S) -> + {noreply, S}. + + +terminate(_Reason, #state{socket = undefined}) -> + log4erl:error("Merle watcher terminated, socket is empty!"), + ok; + +terminate(_Reason, #state{socket = Socket}) -> + log4erl:error("Merle watcher terminated, killing socket!"), + erlang:exit(Socket, watcher_died), + ok. + +%% +%% HELPER FUNCTIONS +%% + +connect_socket(State = #state{}) -> + self() ! 'connect', + check_out_state_indefinitely(State#state{socket = undefined}). + + +check_out_state_indefinitely(State = #state{}) -> + check_out_state(State, indefinite). + + +check_out_state(State = #state{}, CheckOutTime) -> + State#state{ + checked_out = true, + check_out_time = CheckOutTime + }. + + +check_in_state(State = #state{}) -> + State#state{ + checked_out = false, + check_out_time = undefined + }. \ No newline at end of file diff --git a/src/merle_client_sup.erl b/src/merle_client_sup.erl new file mode 100644 index 0000000..fcb022f --- /dev/null +++ b/src/merle_client_sup.erl @@ -0,0 +1,22 @@ +-module(merle_client_sup). + +-export([start_link/2, init/1]). + +-export([start_child/1]). + +-behaviour(supervisor). + +start_link(Instances, ConnectionsPerInstance) -> + {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), + + merle_cluster:configure(Instances, ConnectionsPerInstance), + + {ok, Pid}. + +start_child(N) -> + supervisor:start_child(?MODULE, [N]). + +init([]) -> + MCDSpec = {mcd, {merle_client, start_link, []}, + permanent, 5000, worker, dynamic}, + {ok, {{simple_one_for_one, 10, 10}, [MCDSpec]}}. diff --git a/src/merle_cluster.erl b/src/merle_cluster.erl index 3242291..55ad4de 100644 --- a/src/merle_cluster.erl +++ b/src/merle_cluster.erl @@ -1,6 +1,6 @@ -module(merle_cluster). --export([configure/2]). +-export([configure/2, exec/4]). index_map(F, List) -> {Map, _} = lists:mapfoldl(fun(X, Iter) -> {F(X, Iter), Iter +1} end, 1, List), @@ -9,23 +9,73 @@ index_map(F, List) -> configure(MemcachedHosts, ConnectionsPerHost) -> SortedMemcachedHosts = lists:sort(MemcachedHosts), DynModuleBegin = "-module(merle_cluster_dynamic). - -export([get_server/1]). - get_server(ClusterKey) -> N = erlang:phash2(ClusterKey, ~p), - do_get_server(N). - ", - DynModuleMap = "do_get_server(~p) -> {\"~s\", ~p}; ", - DynModuleEnd = "do_get_server(_N) -> throw({invalid_server_slot, _N}).\n", - ModuleString = lists:flatten([ - io_lib:format(DynModuleBegin, [length(SortedMemcachedHosts)]), - index_map(fun([Host, Port], I) -> - io_lib:format(DynModuleMap, [I-1, Host, Port]) - end, SortedMemcachedHosts), - DynModuleEnd - ]), - {M, B} = dynamic_compile:from_string(ModuleString), - code:load_binary(M, "", B), - lists:foreach(fun([Host, Port]) -> - lists:foreach(fun(_) -> - supervisor:start_child(merle_sup, [[Host, Port]]) - end, lists:seq(1, ConnectionsPerHost)) - end, SortedMemcachedHosts). + -export([get_server/1]). + get_server(ClusterKey) -> N = erlang:phash2(ClusterKey, ~p), + do_get_server(N).\n", + DynModuleMap = "do_get_server(~p) -> {\"~s\", ~p}; ", + DynModuleEnd = "do_get_server(_N) -> throw({invalid_server_slot, _N}).\n", + + ModuleString = lists:flatten([ + io_lib:format(DynModuleBegin, [length(SortedMemcachedHosts)]), + index_map(fun([Host, Port], I) -> io_lib:format(DynModuleMap, [I-1, Host, Port]) end, SortedMemcachedHosts), + DynModuleEnd + ]), + + log4erl:error("dyn module str ~p", [ModuleString]), + + {M, B} = dynamic_compile:from_string(ModuleString), + code:load_binary(M, "", B), + + % start all merle watchers + lists:foreach( + fun([Host, Port]) -> + lists:foreach( + fun(_) -> + merle_client_sup:start_child([Host, Port]) + end, + lists:seq(1, ConnectionsPerHost) + ) + end, + SortedMemcachedHosts + ). + + +exec(Key, Fun, Default, Now) -> + S = merle_cluster_dynamic:get_server(Key), + exec_on_client( + merle_pool:get_client(round_robin, S), + Key, + Fun, + Default, + Now + ). + +exec_on_client({error, Error}, _Key, _Fun, Default, _Now) -> + log4erl:error("Error finding merle client: ~r~n, returning default value", [Error]), + {error_finding_client, Default}; +exec_on_client(undefined, _Key, _Fun, Default, _Now) -> + log4erl:error("Undefined merle client, returning default value"), + {undefined_client, Default}; +exec_on_client(Client, Key, Fun, Default, Now) -> + exec_on_socket(merle_client:checkout(Client, self(), Now), Client, Key, Fun, Default). + + +exec_on_socket(no_socket, _Client, _Key, _Fun, Default) -> + log4erl:error("Designated merle connection has no socket, returning default value"), + {no_socket, Default}; +exec_on_socket(busy, _Client, _Key, _Fun, Default) -> + log4erl:error("Designated merle connection is in use, returning default value"), + {in_use, Default}; +exec_on_socket(Socket, Client, Key, Fun, Default) -> + FinalValue = case Fun(Socket, Key) of + {error, Error} -> + log4erl:info("Merle encountered error ~p, returning default value", [Error]), + {Error, Default}; + {ok, Value} -> + {ok, Value} + end, + + merle_client:checkin(Client), + + FinalValue. + diff --git a/src/merle_pool.erl b/src/merle_pool.erl new file mode 100644 index 0000000..7997758 --- /dev/null +++ b/src/merle_pool.erl @@ -0,0 +1,280 @@ +-module(merle_pool). + +%% Basically the same functionality than pg2, but process groups are local rather than global. +-export([create/1, delete/1, join/2, leave/2, + get_members/1, count_available/1, clean_locks/0, + get_client/2, + which_groups/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-define(PIDS_TABLE, merle_pool_pids). +-define(INDICES_TABLE, merle_pool_indices). + +-define(CLEAN_LOCKS_INTERVAL, 10000). % every 10 seconds + +-record(server_state, { + periodic_lock_clean +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +create(Name) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + gen_server:call(?MODULE, {create, Name}); + _ -> + ok + end. + +delete(Name) -> + gen_server:call(?MODULE, {delete, Name}). + +join(Name, Pid) when is_pid(Pid) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + {error, {no_such_group, Name}}; + _ -> + gen_server:call(?MODULE, {join, Name, Pid}) + end. + +leave(Name, Pid) when is_pid(Pid) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + {error, {no_such_group, Name}}; + _ -> + gen_server:call(?MODULE, {leave, Name, Pid}) + end. + +get_members(Name) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> {error, {no_such_group, Name}}; + [{Name, Members}] -> Members + end. + +which_groups() -> + [K || {K, _Members} <- ets:tab2list(?PIDS_TABLE)]. + +count_available(Name) -> + case ets:lookup(?PIDS_TABLE, Name) of + + [] -> {error, {no_such_group, Name}}; + + [{Name, Members}] -> + NumAvail = lists:foldl( + fun(Member, Acc) -> + case merle_client:get_checkout_state(Member) of + {false, _} -> + Acc + 1; + _ -> + Acc + end + end, + 0, + Members + ), + + {length(Members), NumAvail} + end. + +clean_locks() -> + L = ets:tab2list(?PIDS_TABLE), + + {Cleaned, Connections} = lists:foldl( + fun({_, Pids}, {C, V}) -> + {C2, V2} = clean_locks(Pids), + {C + C2, V + V2} + end, + {0, 0}, + L + ), + + log4erl:error("Cleaned ~p merle locks on ~p valid connections", [Cleaned, Connections]), + + {Cleaned, Connections}. + +clean_locks(Clients) -> + NowSecs = now_secs(), + CleanLocksIntervalSecs = ?CLEAN_LOCKS_INTERVAL div 1000, + + Acc = lists:foldl( + fun(Client, {NumCleaned, ValidConnections}) -> + + % clear locks with stale last_unlocked time + NumCleaned2 = case merle_client:get_checkout_state(Client) of + {false, _} -> + NumCleaned; + + {true, indefinite} -> + NumCleaned; + + {true, CheckOutTime} -> + case (CheckOutTime + CleanLocksIntervalSecs) < NowSecs of + true -> + merle_client:checkin(Client), + NumCleaned + 1; + false -> + NumCleaned + end + end, + + % maintain a count of the number of valid connections + ValidConnections2 = case merle_client:get_socket(Client) of + undefined -> ValidConnections; + _ -> ValidConnections + 1 + end, + + {NumCleaned2, ValidConnections2} + end, + + {0, 0}, + + Clients + ), + + Acc. + + +shift_rr_index(Name, MembersLen) -> + ets:update_counter(?INDICES_TABLE, {Name, rr_index}, {2, 1, MembersLen, 1}). + + +get_client(random, Name) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + {error, {no_process, Name}}; + [{Name, Members}] -> + %% TODO: we can get more inteligent, check queue size, reductions, etc. + %% http://lethain.com/entry/2009/sep/12/load-balancing-across-erlang-process-groups/ + {_, _, X} = erlang:now(), + + Pid = lists:nth((X rem length(Members)) +1, Members), + + Pid + end; + +get_client(round_robin, Name) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + {error, {no_process, Name}}; + [{Name, Members}] -> + + MembersLen = length(Members), + + % Get the round robin index + RRIndex = shift_rr_index(Name, MembersLen), + + lists:nth(RRIndex, Members) + end. + +%% +%% SERVER FUNCTIONS +%% + +init([]) -> + process_flag(trap_exit, true), + ets:new(?PIDS_TABLE, [set, public, named_table, {read_concurrency, true}]), + ets:new(?INDICES_TABLE, [set, public, named_table, {write_concurrency, true}]), + + PLC = timer:apply_interval(?CLEAN_LOCKS_INTERVAL, merle_pool, clean_locks, []), + + State = #server_state { + periodic_lock_clean = PLC + }, + + {ok, State}. + +handle_call({create, Name}, _From, S) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + ets:insert(?INDICES_TABLE, {{Name, rr_index}, 1}), + ets:insert(?PIDS_TABLE, {Name, []}); + _ -> + ok + end, + {reply, ok, S}; + +handle_call({join, Name, Pid}, _From, S) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + {reply, no_such_group, S}; + [{Name, Members}] -> + + % NOTE: skip one index since we are about to grow the list, this prevents collisions + shift_rr_index(Name, length(Members)), + + % insert new pid into the table + ets:insert(?PIDS_TABLE, {Name, [Pid | Members]}), + + % NOTE: link processes on join, so that if client dies, we remove it from the pool + link(Pid), + + {reply, ok, S} + end; + +handle_call({leave, Name, Pid}, _From, S) -> + case ets:lookup(?PIDS_TABLE, Name) of + [] -> + {reply, no_such_group, S}; + [{Name, Members}] -> + case lists:delete(Pid, Members) of + [] -> + ets:delete(?PIDS_TABLE, Name); + NewMembers -> + ets:insert(?PIDS_TABLE, {Name, NewMembers}) + end, + unlink(Pid), + {reply, ok, S} + end; + +handle_call({delete, Name}, _From, S) -> + ets:delete(?PIDS_TABLE, Name), + {reply, ok, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info({'EXIT', Pid, _} , S) -> + log4erl:error("Caught local_pg2 EXIT... leaving pg"), + del_member(Pid), + {noreply, S}; + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #server_state{ periodic_lock_clean=PLC }) -> + ets:delete(?PIDS_TABLE), + ets:delete(?INDICES_TABLE), + + timer:cancel(PLC), + + %%do not unlink, if this fails, dangling processes should be killed + ok. + + + +%% +%% HELPER FUNCTIONS +%% + +now_secs() -> + {NowMegaSecs, NowSecs, _} = erlang:now(), + (1.0e+6 * NowMegaSecs) + NowSecs. + +del_member(Pid) -> + L = ets:tab2list(?PIDS_TABLE), + lists:foreach(fun(Elem) -> del_member_func(Elem, Pid) end, L). + +del_member_func({Name, Members}, Pid) -> + case lists:member(Pid, Members) of + true -> + case lists:delete(Pid, Members) of + [] -> + ets:delete(?PIDS_TABLE, Name); + NewMembers -> + ets:insert(?PIDS_TABLE, {Name, NewMembers}) + end; + false -> + ok + end. \ No newline at end of file diff --git a/src/merle_sup.erl b/src/merle_sup.erl index d8ea373..c48add1 100644 --- a/src/merle_sup.erl +++ b/src/merle_sup.erl @@ -2,19 +2,22 @@ -export([start_link/2, init/1]). --export([start_child/1]). - -behaviour(supervisor). start_link(Instances, ConnectionsPerInstance) -> - {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []), - merle_cluster:configure(Instances, ConnectionsPerInstance), - {ok, Pid}. + supervisor:start_link({local, ?MODULE}, ?MODULE, [Instances, ConnectionsPerInstance]). + +init([Instances, ConnectionsPerInstance]) -> + MerlePool = + {merle_pool, + {merle_pool, start_link, []}, + permanent, 5000, worker, dynamic + }, -start_child(N) -> - supervisor:start_child(?MODULE, [N]). + MerleWatcherSup = + {merle_client_sup, + {merle_client_sup, start_link, [Instances, ConnectionsPerInstance]}, + permanent, 5000, supervisor, dynamic + }, -init([]) -> - MCDSpec = {mcd, {merle_watcher, start_link, []}, - permanent, 5000, worker, dynamic}, - {ok, {{simple_one_for_one, 10, 10}, [MCDSpec]}}. + {ok, {{one_for_all, 10, 10}, [MerlePool, MerleWatcherSup]}}. \ No newline at end of file diff --git a/src/merle_watcher.erl b/src/merle_watcher.erl deleted file mode 100644 index de8a553..0000000 --- a/src/merle_watcher.erl +++ /dev/null @@ -1,65 +0,0 @@ --module(merle_watcher). - --export([start_link/1, init/1, handle_call/3, handle_info/2, handle_cast/2, terminate/2]). - --define(RESTART_INTERVAL, 15 * 1000). %% retry each 5 seconds. - --record(state, {mcd_pid, - host, - port}). - -start_link([Host, Port]) -> - gen_server:start_link(?MODULE, [Host, Port], []). - -init([Host, Port]) -> - erlang:process_flag(trap_exit, true), - self() ! timeout, - {ok, #state{mcd_pid = undefined, host = Host, port = Port}}. - -handle_call(_Call, _From, S) -> - {reply, ok, S}. -handle_info('timeout', #state{mcd_pid = undefined, host = Host, port = Port} = State) -> - error_logger:info_report([{memcached, connecting}, {host, Host}, {port, Port}]), - case merle:connect(Host, Port) of - {ok, Pid} -> - local_pg2:create({Host, Port}), - local_pg2:join({Host, Port}, Pid), - {noreply, State#state{mcd_pid = Pid}}; - {error, Reason} -> - receive - {'EXIT', _ , _} -> - ok - after - 2000 -> - ok - end, - error_logger:error_report([memcached_not_started, - {reason, Reason}, - {host, Host}, - {port, Port}, - {restarting_in, ?RESTART_INTERVAL}]), - {noreply, State, ?RESTART_INTERVAL} - end; - -handle_info({'EXIT', Pid, Reason}, #state{mcd_pid = Pid} = S) -> - error_logger:error_report([{memcached_crashed, Pid}, - {reason, Reason}, - {host, S#state.host}, - {port, S#state.port}, - {restarting_in, ?RESTART_INTERVAL}]), - {noreply, S#state{mcd_pid = undefined}, ?RESTART_INTERVAL}; -handle_info(_Info, S) -> - error_logger:warning_report([{merle_watcher, self()}, {unknown_info, _Info}]), - case S#state.mcd_pid of - undefined -> - {noreply, S, ?RESTART_INTERVAL}; - _ -> - {noreply, S} - end. -handle_cast(_Cast, S) -> - {noreply, S}. -terminate(_Reason, _S) -> - ok. - - - diff --git a/src/priority_queue.erl b/src/priority_queue.erl new file mode 100644 index 0000000..1e481ca --- /dev/null +++ b/src/priority_queue.erl @@ -0,0 +1,191 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% Priority queues have essentially the same interface as ordinary +%% queues, except that a) there is an in/3 that takes a priority, and +%% b) we have only implemented the core API we need. +%% +%% Priorities should be integers - the higher the value the higher the +%% priority - but we don't actually check that. +%% +%% in/2 inserts items with priority 0. +%% +%% We optimise the case where a priority queue is being used just like +%% an ordinary queue. When that is the case we represent the priority +%% queue as an ordinary queue. We could just call into the 'queue' +%% module for that, but for efficiency we implement the relevant +%% functions directly in here, thus saving on inter-module calls and +%% eliminating a level of boxing. +%% +%% When the queue contains items with non-zero priorities, it is +%% represented as a sorted kv list with the inverted Priority as the +%% key and an ordinary queue as the value. Here again we use our own +%% ordinary queue implemention for efficiency, often making recursive +%% calls into the same function knowing that ordinary queues represent +%% a base case. + + +-module(priority_queue). + +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(priority() :: integer()). +-type(squeue() :: {queue, [any()], [any()]}). +-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). + +-spec(new/0 :: () -> pqueue()). +-spec(is_queue/1 :: (any()) -> boolean()). +-spec(is_empty/1 :: (pqueue()) -> boolean()). +-spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(in/2 :: (any(), pqueue()) -> pqueue()). +-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). +-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> + {queue, [], []}. + +is_queue({queue, R, F}) when is_list(R), is_list(F) -> + true; +is_queue({pqueue, Queues}) when is_list(Queues) -> + lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, + Queues); +is_queue(_) -> + false. + +is_empty({queue, [], []}) -> + true; +is_empty(_) -> + false. + +len({queue, R, F}) when is_list(R), is_list(F) -> + length(R) + length(F); +len({pqueue, Queues}) -> + lists:sum([len(Q) || {_, Q} <- Queues]). + +to_list({queue, In, Out}) when is_list(In), is_list(Out) -> + [{0, V} || V <- Out ++ lists:reverse(In, [])]; +to_list({pqueue, Queues}) -> + [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + +in(Item, Q) -> + in(Item, 0, Q). + +in(X, 0, {queue, [_] = In, []}) -> + {queue, [X], In}; +in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out}; +in(X, Priority, _Q = {queue, [], []}) -> + in(X, Priority, {pqueue, []}); +in(X, Priority, Q = {queue, _, _}) -> + in(X, Priority, {pqueue, [{0, Q}]}); +in(X, Priority, {pqueue, Queues}) -> + P = -Priority, + {pqueue, case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end}. + +out({queue, [], []} = Q) -> + {empty, Q}; +out({queue, [V], []}) -> + {{value, V}, {queue, [], []}}; +out({queue, [Y|In], []}) -> + [V|Out] = lists:reverse(In, []), + {{value, V}, {queue, [Y], Out}}; +out({queue, In, [V]}) when is_list(In) -> + {{value,V}, r2f(In)}; +out({queue, In,[V|Out]}) when is_list(In) -> + {{value, V}, {queue, In, Out}}; +out({pqueue, [{P, Q} | Queues]}) -> + {R, Q1} = out(Q), + NewQ = case is_empty(Q1) of + true -> case Queues of + [] -> {queue, [], []}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues} + end; + false -> {pqueue, [{P, Q1} | Queues]} + end, + {R, NewQ}. + +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + +r2f([]) -> {queue, [], []}; +r2f([_] = R) -> {queue, [], R}; +r2f([X,Y]) -> {queue, [X], [Y]}; +r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. diff --git a/t/merle_t02.t b/t/merle_t02.t index 5662d31..9f98e30 100644 --- a/t/merle_t02.t +++ b/t/merle_t02.t @@ -10,10 +10,10 @@ main(_) -> Key = rnd_key(), Key2 = rnd_key(), etap:is(merle:set(Key, "1", "0", "bar"), ok, "Set data"), - etap:is(merle:get(Key), "bar", "Get data"), - etap:is(merle:get(rnd_key()), undefined, "Get invalid data"), + etap:is(merle:getkey(Key), "bar", "Get data"), + etap:is(merle:getkey(rnd_key()), undefined, "Get invalid data"), etap:is(merle:set(Key2, "1", "0", {foo, bar}), ok, "Set data"), - etap:is(merle:get(Key2), {foo, bar}, "Get data"), + etap:is(merle:getkey(Key2), {foo, bar}, "Get data"), etap:end_tests(). rnd_key() -> @@ -23,4 +23,4 @@ rnd_key() -> [[random:uniform(25) + 96] || _ <-lists:seq(1,5)], [[random:uniform(9) + 47] || _ <-lists:seq(1,3)] ]). - \ No newline at end of file +