Skip to content

Commit

Permalink
To modified for leo-project/leofs/issues/437 more correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Jan 1, 2016
1 parent ff88a6d commit 28ac565
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
16 changes: 7 additions & 9 deletions src/leo_mq_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,7 @@ consume(#state{mq_properties = #mq_properties{
db_procs = NumOfProcs,
publisher_id = PublisherId,
mod_callback = Mod},
named_mqdb_pid = NamedMqDbPid,
batch_of_msgs = NumOfBatchProcs,
%% interval = Interval,
prev_proc_time = PrevProcTime} = _State, IsForceExec) ->
ThisTime = leo_date:clock(),
Diff = erlang:round((ThisTime - PrevProcTime) / 1000),
Expand All @@ -486,33 +484,33 @@ consume(#state{mq_properties = #mq_properties{
orelse IsForceExec) of
true ->
NumOfBatchProcs_1 = leo_math:ceiling(NumOfBatchProcs / NumOfProcs),
consume(PublisherId, Mod, NamedMqDbPid, NumOfBatchProcs_1);
consume(PublisherId, Mod, NumOfBatchProcs_1);
false ->
{error, short_interval}
end.

%% @doc Consume a message
%% @private
-spec(consume(atom(), atom(), atom(), non_neg_integer()) ->
-spec(consume(atom(), atom(), non_neg_integer()) ->
ok | not_found | {error, any()}).
consume(_Id,_,_,0) ->
consume(_Id,_,0) ->
ok;
consume(Id, Mod, NamedMqDbPid, NumOfBatchProcs) ->
case leo_mq_server:dequeue(Id, NamedMqDbPid) of
consume(Id, Mod, NumOfBatchProcs) ->
case leo_mq_server:dequeue(Id) of
{ok, MsgBin} ->
try
erlang:apply(Mod, handle_call, [{consume, Id, MsgBin}])
catch
_:Reason ->
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "consume/4"},
{function, "consume/3"},
{line, ?LINE}, {body, [{module, Mod},
{id, Id},
{cause, Reason}
]}])
after
consume(Id, Mod, NamedMqDbPid, NumOfBatchProcs - 1)
consume(Id, Mod, NumOfBatchProcs - 1)
end;
Other ->
Other
Expand Down
24 changes: 13 additions & 11 deletions src/leo_mq_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
terminate/2,
code_change/3]).

-export([publish/3, dequeue/2,
-export([publish/3, dequeue/1,
status/1, update_consumer_stats/4, close/1]).

-ifdef(TEST).
Expand Down Expand Up @@ -96,8 +96,8 @@ publish(Id, KeyBin, MessageBin) ->

%% @doc Register a queuing data.
%%
dequeue(Id, NamedMqDbPid) ->
gen_server:call(Id, {dequeue, NamedMqDbPid}, ?DEF_TIMEOUT).
dequeue(Id) ->
gen_server:call(Id, dequeue, ?DEF_TIMEOUT).


%% @doc Retrieve the current state from the queue.
Expand Down Expand Up @@ -164,10 +164,12 @@ handle_call({publish, KeyBin, MessageBin}, _From, #state{count = Count} = State)
Reply = put_message(KeyBin, MessageBin, State),
{reply, Reply, State#state{count = Count + 1}, ?DEF_TIMEOUT};

handle_call({dequeue, NamedMqDbPid}, _From, #state{count = Count} = State) ->
handle_call(dequeue, _From, #state{count = Count,
mq_properties = MQProps} = State) ->
MQDBMessageId = MQProps#mq_properties.mqdb_id,
{Reply, Count_1} =
case catch leo_backend_db_server:first(NamedMqDbPid) of
{ok, Key, Val} ->
case catch leo_backend_db_api:first(MQDBMessageId) of
{ok, {Key, Val}} ->
%% Taking measure of queue-msg migration
%% for previsous 1.2.0-pre1
MsgTerm = binary_to_term(Val),
Expand All @@ -180,7 +182,7 @@ handle_call({dequeue, NamedMqDbPid}, _From, #state{count = Count} = State) ->
end,

%% Remove the queue from the backend-db
case catch leo_backend_db_server:delete(NamedMqDbPid, Key) of
case catch leo_backend_db_api:delete(MQDBMessageId, Key) of
ok when Count > 0 ->
{{ok, MsgBin}, Count - 1};
ok ->
Expand All @@ -193,7 +195,7 @@ handle_call({dequeue, NamedMqDbPid}, _From, #state{count = Count} = State) ->
{{error, Why}, Count}
end;
not_found = Cause ->
{Cause, Count};
{Cause, 0};
{_, Cause} ->
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
Expand Down Expand Up @@ -277,16 +279,16 @@ code_change(_OldVsn, State, _Extra) ->
ok | {error, any()}).
put_message(MsgKeyBin, MsgBin, #state{mq_properties = MQProps}) ->
try
BackendMessage = MQProps#mq_properties.mqdb_id,
MQDBMessageId = MQProps#mq_properties.mqdb_id,
case leo_backend_db_api:put(
BackendMessage, MsgKeyBin, MsgBin) of
MQDBMessageId, MsgKeyBin, MsgBin) of
ok ->
ok;
Error ->
Error
end
catch
_ : Cause ->
_:Cause ->
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "put_message/3"},
Expand Down

0 comments on commit 28ac565

Please sign in to comment.