Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Dec 31, 2015
1 parent d6f0f20 commit ff88a6d
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 23 deletions.
12 changes: 6 additions & 6 deletions src/leo_mq_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ prop_list_to_mq_properties(Id, Mod, Props) ->
KeyBin::binary(),
MessageBin::binary()).
publish(Id, KeyBin, MessageBin) ->
leo_mq_publisher:publish(Id, KeyBin, MessageBin).
leo_mq_server:publish(Id, KeyBin, MessageBin).


%% @doc Suspend consumption of messages in the queue
Expand Down Expand Up @@ -148,7 +148,7 @@ resume(Id, Seq) ->
-spec(status(Id) ->
{ok, [{atom(), any()}]} when Id::atom()).
status(Id) ->
leo_mq_publisher:status(Id).
leo_mq_server:status(Id).


%% @doc Retrieve registered consumers
Expand All @@ -167,7 +167,7 @@ consumers() ->
Consumers = [ #mq_state{id = ?publisher_id(Worker)} ||
{Worker,_,worker,[leo_mq_consumer]} <- Children ],
Consumers_1 = lists:map(fun(Id) ->
{ok, State} = leo_mq_publisher:status(Id),
{ok, State} = leo_mq_server:status(Id),
#mq_state{id = Id, state = State}
end, consumers_1(Consumers, sets:new())),
{ok, Consumers_1}
Expand Down Expand Up @@ -215,12 +215,12 @@ decrease(Id, Seq) ->
%%--------------------------------------------------------------------
%% INNTERNAL FUNCTIONS
%%--------------------------------------------------------------------
%% @private Start 'leo_mq_publisher'
%% @private Start 'leo_mq_server'
start_child_1(RefSup, #mq_properties{publisher_id = PublisherId} = Props) ->
case supervisor:start_child(
RefSup, {PublisherId,
{leo_mq_publisher, start_link, [PublisherId, Props]},
permanent, 2000, worker, [leo_mq_publisher]}) of
{leo_mq_server, start_link, [PublisherId, Props]},
permanent, 2000, worker, [leo_mq_server]}) of
{ok, _Pid} ->
ok;
{error,Reason} ->
Expand Down
18 changes: 9 additions & 9 deletions src/leo_mq_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ idling(#event_info{event = ?EVENT_RUN}, From, #state{id = Id,
start_datetime = leo_date:now()},
gen_fsm:reply(From, ok),
ok = run(Id),
ok = leo_mq_publisher:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval),
ok = leo_mq_server:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval),
{next_state, NextStatus, State_1};
idling(#event_info{event = ?EVENT_STATE}, From, #state{status = Status} = State) ->
gen_fsm:reply(From, {ok, Status}),
Expand All @@ -262,7 +262,7 @@ idling(#event_info{event = ?EVENT_RUN}, #state{id = Id,
interval = Interval} = State) ->
NextStatus = ?ST_RUNNING,
ok = run(Id),
ok = leo_mq_publisher:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval),
ok = leo_mq_server:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval),
{next_state, NextStatus, State#state{status = ?ST_IDLING}};

idling(#event_info{event = ?EVENT_INCR},
Expand Down Expand Up @@ -327,7 +327,7 @@ running(#event_info{event = ?EVENT_RUN,
{?ST_IDLING, State_1}
end,

ok = leo_mq_publisher:update_consumer_stats(
ok = leo_mq_server:update_consumer_stats(
PublisherId, NextStatus, BatchOfMsgs, Interval),
{next_state, NextStatus, State_2#state{status = NextStatus,
prev_proc_time = leo_date:clock()}};
Expand All @@ -336,7 +336,7 @@ running(#event_info{event = ?EVENT_SUSPEND}, #state{publisher_id = PublisherId,
batch_of_msgs = BatchOfMsgs,
interval = Interval} = State) ->
NextStatus = ?ST_SUSPENDING,
ok = leo_mq_publisher:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval),
ok = leo_mq_server:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval),
{next_state, NextStatus, State#state{status = NextStatus}};


Expand All @@ -354,7 +354,7 @@ running(#event_info{event = ?EVENT_INCR},

%% Modify the items
NextStatus = ?ST_RUNNING,
ok = leo_mq_publisher:update_consumer_stats(
ok = leo_mq_server:update_consumer_stats(
PublisherId, NextStatus, BatchOfMsgs_1, Interval_1),
{next_state, NextStatus, State#state{status = NextStatus,
batch_of_msgs = BatchOfMsgs_1,
Expand All @@ -380,7 +380,7 @@ running(#event_info{event = ?EVENT_DECR},
decr_batch_procs_fun(BatchOfMsgs, StepBatchOfMsgs)}
end,

ok = leo_mq_publisher:update_consumer_stats(
ok = leo_mq_server:update_consumer_stats(
PublisherId, NextStatus, BatchOfMsgs_1, Interval_1),
{next_state, NextStatus, State#state{batch_of_msgs = BatchOfMsgs_1,
interval = Interval_1,
Expand Down Expand Up @@ -426,7 +426,7 @@ suspending(#event_info{event = ?EVENT_INCR},
%% To the next status
timer:apply_after(timer:seconds(1), ?MODULE, run, [Id]),
NextStatus = ?ST_RUNNING,
ok = leo_mq_publisher:update_consumer_stats(
ok = leo_mq_server:update_consumer_stats(
PublisherId, NextStatus, BatchOfMsgs_1, Interval_1),
{next_state, NextStatus, State#state{status = NextStatus,
batch_of_msgs = BatchOfMsgs_1,
Expand All @@ -445,7 +445,7 @@ suspending(#event_info{event = ?EVENT_RESUME}, From, #state{id = Id,
interval = Interval} = State) ->
gen_fsm:reply(From, ok),
ok = run(Id),
ok = leo_mq_publisher:update_consumer_stats(PublisherId, ?ST_RUNNING, BatchOfMsgs, Interval),
ok = leo_mq_server:update_consumer_stats(PublisherId, ?ST_RUNNING, BatchOfMsgs, Interval),
{next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}};
suspending(#event_info{event = ?EVENT_STATE}, From, #state{status = Status} = State) ->
gen_fsm:reply(From, {ok, Status}),
Expand Down Expand Up @@ -498,7 +498,7 @@ consume(#state{mq_properties = #mq_properties{
consume(_Id,_,_,0) ->
ok;
consume(Id, Mod, NamedMqDbPid, NumOfBatchProcs) ->
case leo_mq_publisher:dequeue(Id, NamedMqDbPid) of
case leo_mq_server:dequeue(Id, NamedMqDbPid) of
{ok, MsgBin} ->
try
erlang:apply(Mod, handle_call, [{consume, Id, MsgBin}])
Expand Down
15 changes: 8 additions & 7 deletions src/leo_mq_publisher.erl → src/leo_mq_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
%% ---------------------------------------------------------------------
%% Leo MQ - Server
%% @doc The gen_server process for the process of a mq's publisher as part of a supervision tree
%% @reference https://github.com/leo-project/leo_mq/blob/master/src/leo_mq_publisher.erl
%% @reference https://github.com/leo-project/leo_mq/blob/master/src/leo_mq_server.erl
%% @end
%%======================================================================
-module(leo_mq_publisher).
-module(leo_mq_server).

-behaviour(gen_server).

Expand Down Expand Up @@ -134,15 +134,16 @@ close(Id) ->
init([Id, #mq_properties{db_name = DBName,
db_procs = DBProcs,
mqdb_id = MQDBMessageId,
mqdb_path = MQDBMessagePath
mqdb_path = MQDBMessagePath,
root_path = RootPath
} = MQProps]) ->
case application:get_env(leo_mq, backend_db_sup_ref) of
{ok, Pid} ->
ok = leo_backend_db_sup:start_child(
Pid, MQDBMessageId,
DBProcs, DBName, MQDBMessagePath),
%% Retrieve total num of message from the local state file
StateFilePath = lists:append([MQDBMessagePath, "_", atom_to_list(Id)]),
StateFilePath = filename:join([RootPath, atom_to_list(Id)]),
Count = case file:consult(StateFilePath) of
{ok, Props} ->
leo_misc:get_value('count', Props, 0);
Expand Down Expand Up @@ -302,9 +303,9 @@ put_message(MsgKeyBin, MsgBin, #state{mq_properties = MQProps}) ->
StateFilePath::string()).
close_db(InstanseName, Count, StateFilePath) ->
%% Output the current state
_ = leo_file:file_unconsult(StateFilePath, [{id, InstanseName},
{count, Count}
]),
catch leo_file:file_unconsult(StateFilePath, [{id, InstanseName},
{count, Count}
]),
%% Close the backend-db
case whereis(leo_backend_db_sup) of
Pid when is_pid(Pid) ->
Expand Down
2 changes: 1 addition & 1 deletion src/leo_mq_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ after_proc(Error) ->
%% @private
close_db([]) ->
ok;
close_db([{Id,_Pid, worker, ['leo_mq_publisher' = Mod|_]}|T]) ->
close_db([{Id,_Pid, worker, ['leo_mq_server' = Mod|_]}|T]) ->
ok = Mod:close(Id),
close_db(T);
close_db([_|T]) ->
Expand Down

0 comments on commit ff88a6d

Please sign in to comment.