From 730a6e29228db40d80b61ee36037b6fdcb5314fe Mon Sep 17 00:00:00 2001 From: Yosuke Hara Date: Wed, 30 Dec 2015 23:10:56 +0900 Subject: [PATCH] To change the backend-db from bitcask to leveldb#1 for fixing leo-project/leofs/issues/437 --- include/leo_mq.hrl | 135 +++++++++++++++++++-------------------- src/leo_mq_consumer.erl | 100 +++++++++++------------------ src/leo_mq_publisher.erl | 89 +++++++++++++++++--------- 3 files changed, 163 insertions(+), 161 deletions(-) diff --git a/include/leo_mq.hrl b/include/leo_mq.hrl index 3fca180..4e7acf3 100644 --- a/include/leo_mq.hrl +++ b/include/leo_mq.hrl @@ -23,109 +23,108 @@ %% @doc %% @end %%====================================================================== --define(MQ_LOG_ID, 'queue'). --define(MQ_LOG_FILE_NAME, "queue"). --define(MQ_PROP_MOD, 'module'). --define(MQ_PROP_FUN, 'function'). --define(MQ_PROP_DB_NAME, 'db_name'). --define(MQ_PROP_DB_PROCS, 'db_procs'). --define(MQ_PROP_ROOT_PATH, 'root_path'). +-define(MQ_LOG_ID, 'queue'). +-define(MQ_LOG_FILE_NAME, "queue"). +-define(MQ_PROP_MOD, 'module'). +-define(MQ_PROP_FUN, 'function'). +-define(MQ_PROP_DB_NAME, 'db_name'). +-define(MQ_PROP_DB_PROCS, 'db_procs'). +-define(MQ_PROP_ROOT_PATH, 'root_path'). -define(MQ_PROP_NUM_OF_BATCH_PROC, 'num_of_batch_processes'). --define(MQ_SUBSCRIBE_FUN, 'subscribe'). +-define(MQ_SUBSCRIBE_FUN, 'subscribe'). --define(MQ_PROP_INTERVAL_MAX, 'interval_max'). --define(MQ_PROP_INTERVAL_REG, 'interval_reg'). --define(MQ_PROP_BATCH_MSGS_MAX, 'batch_of_msgs_max'). --define(MQ_PROP_BATCH_MSGS_REG, 'batch_of_msgs_reg'). --define(MQ_PROP_NUM_OF_STEPS, 'num_of_steps'). +-define(MQ_PROP_INTERVAL_MAX, 'interval_max'). +-define(MQ_PROP_INTERVAL_REG, 'interval_reg'). +-define(MQ_PROP_BATCH_MSGS_MAX, 'batch_of_msgs_max'). +-define(MQ_PROP_BATCH_MSGS_REG, 'batch_of_msgs_reg'). +-define(MQ_PROP_NUM_OF_STEPS, 'num_of_steps'). -define(DEF_BACKEND_DB_PROCS, 3). --define(DEF_BACKEND_DB, 'bitcask'). +-define(DEF_BACKEND_DB, 'leveldb'). +%% -define(DEF_BACKEND_DB, 'bitcask'). -define(DEF_DB_ROOT_PATH, "mq" ). -ifdef(TEST). --define(DEF_CONSUME_MAX_INTERVAL, 1000). --define(DEF_CONSUME_MIN_INTERVAL, 100). --define(DEF_CONSUME_REG_INTERVAL, 300). --define(DEF_CONSUME_MAX_BATCH_MSGS, 10). --define(DEF_CONSUME_MIN_BATCH_MSGS, 1). --define(DEF_CONSUME_REG_BATCH_MSGS, 5). --define(DEF_CONSUME_NUM_OF_STEPS, 5). +-define(DEF_CONSUME_MAX_INTERVAL, 1000). +-define(DEF_CONSUME_MIN_INTERVAL, 100). +-define(DEF_CONSUME_REG_INTERVAL, 300). +-define(DEF_CONSUME_MAX_BATCH_MSGS, 10). +-define(DEF_CONSUME_MIN_BATCH_MSGS, 1). +-define(DEF_CONSUME_REG_BATCH_MSGS, 5). +-define(DEF_CONSUME_NUM_OF_STEPS, 5). -else. --define(DEF_CONSUME_MAX_INTERVAL, 3000). --define(DEF_CONSUME_MIN_INTERVAL, 100). --define(DEF_CONSUME_REG_INTERVAL, 300). --define(DEF_CONSUME_MAX_BATCH_MSGS, 1000). --define(DEF_CONSUME_MIN_BATCH_MSGS, 100). --define(DEF_CONSUME_REG_BATCH_MSGS, 300). --define(DEF_CONSUME_NUM_OF_STEPS, 10). +-define(DEF_CONSUME_MAX_INTERVAL, 3000). +-define(DEF_CONSUME_MIN_INTERVAL, 100). +-define(DEF_CONSUME_REG_INTERVAL, 300). +-define(DEF_CONSUME_MAX_BATCH_MSGS, 1000). +-define(DEF_CONSUME_MIN_BATCH_MSGS, 100). +-define(DEF_CONSUME_REG_BATCH_MSGS, 300). +-define(DEF_CONSUME_NUM_OF_STEPS, 10). -endif. --define(MQ_CNS_PROP_NUM_OF_MSGS, 'consumer_num_of_msgs'). --define(MQ_CNS_PROP_STATUS, 'consumer_status'). +-define(MQ_CNS_PROP_NUM_OF_MSGS, 'consumer_num_of_msgs'). +-define(MQ_CNS_PROP_STATUS, 'consumer_status'). -define(MQ_CNS_PROP_BATCH_OF_MSGS, 'consumer_batch_of_msgs'). --define(MQ_CNS_PROP_INTERVAL, 'consumer_interval'). +-define(MQ_CNS_PROP_INTERVAL, 'consumer_interval'). -record(mq_properties, { - publisher_id :: atom(), %% publisher-id - consumer_id :: atom(), %% consumer-id - mod_callback :: module(), %% callback module name - db_name :: atom(), %% db's id - db_procs = 1 :: integer(), %% db's processes - root_path = [] :: string(), %% db's path - mqdb_id :: atom(), %% mqdb's id - mqdb_path = [] :: string(), %% mqdb's path + publisher_id :: atom(), %% publisher-id + consumer_id :: atom(), %% consumer-id + mod_callback :: module(), %% callback module name + db_name :: atom(), %% db's id + db_procs = 1 :: integer(), %% db's processes + root_path = [] :: string(), %% db's path + mqdb_id :: atom(), %% mqdb's id + mqdb_path = [] :: string(), %% mqdb's path %% interval between batch-procs - max_interval = 1000 :: pos_integer(), %% max waiting time (default: 1000msec (1sec)) + max_interval = 1000 :: pos_integer(), %% max waiting time (default: 1000msec (1sec)) regular_interval = 300 :: pos_integer(), %% regular waiting time (default: 300msec) %% num of batch procs - max_batch_of_msgs = 1000 :: pos_integer(), %% max num of batch of messages + max_batch_of_msgs = 1000 :: pos_integer(), %% max num of batch of messages regular_batch_of_msgs = 300 :: pos_integer(), %% regular num of batch of messages %% num of steps num_of_steps = ?DEF_CONSUME_NUM_OF_STEPS :: pos_integer() }). -record(mq_log, { - type :: atom(), + type :: atom(), requested_at = 0 :: integer(), - format = [] :: string(), - message = [] :: string()}). + format = [] :: string(), + message = [] :: string()}). --define(ST_IDLING, 'idling'). --define(ST_RUNNING, 'running'). +-define(ST_IDLING, 'idling'). +-define(ST_RUNNING, 'running'). -define(ST_SUSPENDING, 'suspending'). --type(state_of_mq() :: ?ST_IDLING | - ?ST_RUNNING | +-type(state_of_mq() :: ?ST_IDLING | + ?ST_RUNNING | ?ST_SUSPENDING). -record(mq_state, { id :: atom(), desc = [] :: string(), - state :: [{atom(), any()}] + state :: [{atom(), any()}] }). --define(EVENT_RUN, 'run'). +-define(EVENT_RUN, 'run'). -define(EVENT_DIAGNOSE, 'diagnose'). --define(EVENT_LOCK, 'lock'). --define(EVENT_SUSPEND, 'suspend'). --define(EVENT_RESUME, 'resume'). --define(EVENT_FINISH, 'finish'). --define(EVENT_STATE, 'state'). --define(EVENT_INCR, 'increase'). --define(EVENT_DECR, 'decrease'). --type(event_of_compaction() ::?EVENT_RUN | +-define(EVENT_LOCK, 'lock'). +-define(EVENT_SUSPEND, 'suspend'). +-define(EVENT_RESUME, 'resume'). +-define(EVENT_FINISH, 'finish'). +-define(EVENT_STATE, 'state'). +-define(EVENT_INCR, 'increase'). +-define(EVENT_DECR, 'decrease'). +-type(event_of_compaction() ::?EVENT_RUN | ?EVENT_DIAGNOSE | - ?EVENT_LOCK | - ?EVENT_SUSPEND | - ?EVENT_RESUME | - ?EVENT_FINISH | - ?EVENT_STATE | - ?EVENT_INCR | - ?EVENT_DECR - ). - + ?EVENT_LOCK | + ?EVENT_SUSPEND | + ?EVENT_RESUME | + ?EVENT_FINISH | + ?EVENT_STATE | + ?EVENT_INCR | + ?EVENT_DECR). -define(DEF_CHECK_MAX_INTERVAL_1, timer:seconds(1)). -define(DEF_CHECK_MIN_INTERVAL_1, timer:seconds(0)). @@ -152,7 +151,7 @@ end). %% Retrieve the backend-db info --define(DEF_DB_PATH_INDEX, "index" ). +-define(DEF_DB_PATH_INDEX, "index"). -define(DEF_DB_PATH_MESSAGE, "message"). -define(backend_db_info(Id, RootPath), diff --git a/src/leo_mq_consumer.erl b/src/leo_mq_consumer.erl index 4954d11..18625f0 100644 --- a/src/leo_mq_consumer.erl +++ b/src/leo_mq_consumer.erl @@ -25,8 +25,6 @@ %%====================================================================== -module(leo_mq_consumer). --author('Yosuke Hara'). - -behaviour(gen_fsm). -include("leo_mq.hrl"). @@ -185,25 +183,25 @@ init([Id, PublisherId, worker_seq_num = WorkerSeqNum, interval = Interval, batch_of_msgs = BatchOfMsgs - }, ?DEF_TIMEOUT}. + }}. %% @doc Handle events handle_event(_Event, StateName, State) -> - {next_state, StateName, State, ?DEF_TIMEOUT}. + {next_state, StateName, State}. %% @doc Handle 'status' event handle_sync_event(state, _From, StateName, State) -> - {reply, {ok, StateName}, StateName, State, ?DEF_TIMEOUT}; + {reply, {ok, StateName}, StateName, State}; %% @doc Handle 'stop' event handle_sync_event(stop, _From, _StateName, State) -> - {stop, shutdown, ok, State, ?DEF_TIMEOUT}. + {stop, shutdown, ok, State}. %% @doc Handling all non call/cast messages handle_info(timeout, StateName, State) -> - {next_state, StateName, State, ?DEF_TIMEOUT}; + {next_state, StateName, State}; handle_info(_Info, StateName, State) -> - {next_state, StateName, State, ?DEF_TIMEOUT}. + {next_state, StateName, State}. %% @doc This function is called by a gen_server when it is about to @@ -247,17 +245,17 @@ idling(#event_info{event = ?EVENT_RUN}, From, #state{id = Id, gen_fsm:reply(From, ok), ok = run(Id), ok = leo_mq_publisher:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval), - {next_state, NextStatus, State_1, ?DEF_TIMEOUT}; + {next_state, NextStatus, State_1}; idling(#event_info{event = ?EVENT_STATE}, From, #state{status = Status} = State) -> gen_fsm:reply(From, {ok, Status}), - {next_state, ?ST_IDLING, State#state{status = Status}, ?DEF_TIMEOUT}; + {next_state, ?ST_IDLING, State#state{status = Status}}; idling(_, From, State) -> gen_fsm:reply(From, {error, badstate}), - {next_state, ?ST_IDLING, State#state{status = ?ST_IDLING}, ?DEF_TIMEOUT}. + {next_state, ?ST_IDLING, State#state{status = ?ST_IDLING}}. -spec(idling(EventInfo, State) -> {next_state, ?ST_IDLING, State, non_neg_integer()} when EventInfo::#event_info{}, - State::#state{}). + State::#state{}). idling(#event_info{event = ?EVENT_RUN}, #state{id = Id, publisher_id = PublisherId, batch_of_msgs = BatchOfMsgs, @@ -265,7 +263,7 @@ idling(#event_info{event = ?EVENT_RUN}, #state{id = Id, NextStatus = ?ST_RUNNING, ok = run(Id), ok = leo_mq_publisher:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval), - {next_state, NextStatus, State#state{status = ?ST_IDLING}, ?DEF_TIMEOUT}; + {next_state, NextStatus, State#state{status = ?ST_IDLING}}; idling(#event_info{event = ?EVENT_INCR}, #state{mq_properties = #mq_properties{regular_batch_of_msgs = BatchOfMsgs, @@ -273,7 +271,7 @@ idling(#event_info{event = ?EVENT_INCR}, NextStatus = ?ST_IDLING, {next_state, NextStatus, State#state{status = NextStatus, batch_of_msgs = BatchOfMsgs, - interval = Interval}, ?DEF_TIMEOUT}; + interval = Interval}}; idling(#event_info{event = ?EVENT_DECR}, #state{mq_properties = MQProps, @@ -285,9 +283,9 @@ idling(#event_info{event = ?EVENT_DECR}, Interval_1 = incr_interval_fun(Interval, MaxInterval, StepInterval), {next_state, ?ST_IDLING, State#state{status = ?ST_IDLING, batch_of_msgs = BatchOfMsgs_1, - interval = Interval_1}, ?DEF_TIMEOUT}; + interval = Interval_1}}; idling(_, State) -> - {next_state, ?ST_IDLING, State#state{status = ?ST_IDLING}, ?DEF_TIMEOUT}. + {next_state, ?ST_IDLING, State#state{status = ?ST_IDLING}}. %% @doc State of 'running' @@ -297,9 +295,9 @@ idling(_, State) -> State::#state{}). running(#event_info{event = ?EVENT_RUN, is_force_exec = IsForceExec}, #state{id = Id, - publisher_id = PublisherId, - batch_of_msgs = BatchOfMsgs, - interval = Interval} = State) -> + publisher_id = PublisherId, + batch_of_msgs = BatchOfMsgs, + interval = Interval} = State) -> {NextStatus, State_2} = case catch consume(State, IsForceExec) of %% Execute the data-compaction repeatedly @@ -332,14 +330,14 @@ running(#event_info{event = ?EVENT_RUN, ok = leo_mq_publisher:update_consumer_stats( PublisherId, NextStatus, BatchOfMsgs, Interval), {next_state, NextStatus, State_2#state{status = NextStatus, - prev_proc_time = leo_date:clock()}, ?DEF_TIMEOUT}; + prev_proc_time = leo_date:clock()}}; running(#event_info{event = ?EVENT_SUSPEND}, #state{publisher_id = PublisherId, batch_of_msgs = BatchOfMsgs, interval = Interval} = State) -> NextStatus = ?ST_SUSPENDING, ok = leo_mq_publisher:update_consumer_stats(PublisherId, NextStatus, BatchOfMsgs, Interval), - {next_state, NextStatus, State#state{status = NextStatus}, ?DEF_TIMEOUT}; + {next_state, NextStatus, State#state{status = NextStatus}}; running(#event_info{event = ?EVENT_INCR}, @@ -360,7 +358,7 @@ running(#event_info{event = ?EVENT_INCR}, PublisherId, NextStatus, BatchOfMsgs_1, Interval_1), {next_state, NextStatus, State#state{status = NextStatus, batch_of_msgs = BatchOfMsgs_1, - interval = Interval_1}, ?DEF_TIMEOUT}; + interval = Interval_1}}; running(#event_info{event = ?EVENT_DECR}, #state{publisher_id = PublisherId, @@ -386,19 +384,19 @@ running(#event_info{event = ?EVENT_DECR}, PublisherId, NextStatus, BatchOfMsgs_1, Interval_1), {next_state, NextStatus, State#state{batch_of_msgs = BatchOfMsgs_1, interval = Interval_1, - status = NextStatus}, ?DEF_TIMEOUT}; + status = NextStatus}}; running(_, State) -> - {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}, ?DEF_TIMEOUT}. + {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}}. -spec(running( _, _, #state{}) -> {next_state, ?ST_RUNNING, #state{}, non_neg_integer()}). running(#event_info{event = ?EVENT_STATE}, From, #state{status = Status} = State) -> gen_fsm:reply(From, {ok, Status}), - {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}, ?DEF_TIMEOUT}; + {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}}; running(_, From, State) -> gen_fsm:reply(From, {error, badstate}), - {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}, ?DEF_TIMEOUT}. + {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}}. %% @doc State of 'suspend' @@ -408,10 +406,10 @@ running(_, From, State) -> when EventInfo::#event_info{}, State::#state{}). suspending(#event_info{event = ?EVENT_RUN}, State) -> - {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}, ?DEF_TIMEOUT}; + {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}}; suspending(#event_info{event = ?EVENT_STATE}, State) -> - {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}, ?DEF_TIMEOUT}; + {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}}; suspending(#event_info{event = ?EVENT_INCR}, #state{id = Id, @@ -432,9 +430,9 @@ suspending(#event_info{event = ?EVENT_INCR}, PublisherId, NextStatus, BatchOfMsgs_1, Interval_1), {next_state, NextStatus, State#state{status = NextStatus, batch_of_msgs = BatchOfMsgs_1, - interval = Interval_1}, ?DEF_TIMEOUT}; + interval = Interval_1}}; suspending(_, State) -> - {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}, ?DEF_TIMEOUT}. + {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}}. -spec(suspending(EventInfo, From, State) -> {next_state, ?ST_SUSPENDING | ?ST_RUNNING, State, non_neg_integer()} @@ -448,13 +446,13 @@ suspending(#event_info{event = ?EVENT_RESUME}, From, #state{id = Id, gen_fsm:reply(From, ok), ok = run(Id), ok = leo_mq_publisher:update_consumer_stats(PublisherId, ?ST_RUNNING, BatchOfMsgs, Interval), - {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}, ?DEF_TIMEOUT}; + {next_state, ?ST_RUNNING, State#state{status = ?ST_RUNNING}}; suspending(#event_info{event = ?EVENT_STATE}, From, #state{status = Status} = State) -> gen_fsm:reply(From, {ok, Status}), - {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}, ?DEF_TIMEOUT}; + {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}}; suspending(_Other, From, State) -> gen_fsm:reply(From, {error, badstate}), - {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}, ?DEF_TIMEOUT}. + {next_state, ?ST_SUSPENDING, State#state{status = ?ST_SUSPENDING}}. %%-------------------------------------------------------------------- @@ -500,21 +498,10 @@ consume(#state{mq_properties = #mq_properties{ consume(_Id,_,_,0) -> ok; consume(Id, Mod, NamedMqDbPid, NumOfBatchProcs) -> - case catch leo_backend_db_server:first(NamedMqDbPid) of - {ok, Key, Val} -> + case leo_mq_publisher:dequeue(Id, NamedMqDbPid) of + {ok, MsgBin} -> try - %% Taking measure of queue-msg migration - %% for previsous 1.2.0-pre1 - MsgTerm = binary_to_term(Val), - MsgBin = case is_tuple(MsgTerm) of - true when is_integer(element(1, MsgTerm)) andalso - is_binary(element(2, MsgTerm)) -> - element(2, MsgTerm); - _ -> - Val - end, - erlang:apply(Mod, handle_call, [{consume, Id, MsgBin}]), - ok + erlang:apply(Mod, handle_call, [{consume, Id, MsgBin}]) catch _:Reason -> error_logger:error_msg("~p,~p,~p,~p~n", @@ -525,25 +512,10 @@ consume(Id, Mod, NamedMqDbPid, NumOfBatchProcs) -> {cause, Reason} ]}]) after - %% Remove the message - %% and then retrieve the next message - case catch leo_backend_db_server:delete(NamedMqDbPid, Key) of - ok -> - ok; - {_, Why} -> - error_logger:error_msg("~p,~p,~p,~p~n", - [{module, ?MODULE_STRING}, - {function, "consume/4"}, - {line, ?LINE}, {body, Why}]) - end, consume(Id, Mod, NamedMqDbPid, NumOfBatchProcs - 1) end; - not_found = Cause -> - Cause; - {'EXIT', Cause} -> - {error, Cause}; - Error -> - Error + Other -> + Other end. diff --git a/src/leo_mq_publisher.erl b/src/leo_mq_publisher.erl index 1df7177..840b03a 100644 --- a/src/leo_mq_publisher.erl +++ b/src/leo_mq_publisher.erl @@ -26,8 +26,6 @@ %%====================================================================== -module(leo_mq_publisher). --author('Yosuke Hara'). - -behaviour(gen_server). -include("leo_mq.hrl"). @@ -43,7 +41,8 @@ terminate/2, code_change/3]). --export([publish/3, status/1, update_consumer_stats/4, close/1]). +-export([publish/3, dequeue/2, + status/1, update_consumer_stats/4, close/1]). -ifdef(TEST). -define(CURRENT_TIME, 65432100000). @@ -94,6 +93,12 @@ publish(Id, KeyBin, MessageBin) -> gen_server:call(Id, {publish, KeyBin, MessageBin}, ?DEF_TIMEOUT). +%% @doc Register a queuing data. +%% +dequeue(Id, NamedMqDbPid) -> + gen_server:call(Id, {dequeue, NamedMqDbPid}, ?DEF_TIMEOUT). + + %% @doc Retrieve the current state from the queue. %% -spec(status(Id) -> @@ -129,13 +134,12 @@ init([Id, #mq_properties{db_name = DBName, db_procs = DBProcs, mqdb_id = MQDBMessageId, mqdb_path = MQDBMessagePath} = MQProps]) -> - case application:get_env(leo_mq, backend_db_sup_ref) of {ok, Pid} -> ok = leo_backend_db_sup:start_child( Pid, MQDBMessageId, DBProcs, DBName, MQDBMessagePath), - %% @TODO: Retrieve total num of message from the backend-db + %% @TODO: Retrieve total num of message from the local mq's file Count = 0, {ok, #state{id = Id, mq_properties = MQProps, @@ -146,21 +150,53 @@ init([Id, #mq_properties{db_name = DBName, %% @doc gen_server callback - Module:handle_call(Request, From, State) -> Result -handle_call({publish, KeyBin, MessageBin}, _From, State) -> +handle_call({publish, KeyBin, MessageBin}, _From, #state{count = Count} = State) -> Reply = put_message(KeyBin, MessageBin, State), - {reply, Reply, State, ?DEF_TIMEOUT}; - -handle_call(status, _From, #state{mq_properties = MQProps, - consumer_status = ConsumerStatus, + {reply, Reply, State#state{count = Count + 1}, ?DEF_TIMEOUT}; + +handle_call({dequeue, NamedMqDbPid}, _From, #state{count = Count} = State) -> + {Reply, Count_1} = + case catch leo_backend_db_server:first(NamedMqDbPid) of + {ok, Key, Val} -> + %% Taking measure of queue-msg migration + %% for previsous 1.2.0-pre1 + MsgTerm = binary_to_term(Val), + MsgBin = case is_tuple(MsgTerm) of + true when is_integer(element(1, MsgTerm)) andalso + is_binary(element(2, MsgTerm)) -> + element(2, MsgTerm); + _ -> + Val + end, + + %% Remove the queue from the backend-db + case catch leo_backend_db_server:delete(NamedMqDbPid, Key) of + ok when Count > 0 -> + {{ok, MsgBin}, Count - 1}; + ok -> + {{ok, MsgBin}, Count}; + {_, Why} -> + error_logger:error_msg("~p,~p,~p,~p~n", + [{module, ?MODULE_STRING}, + {function, "handle_call/3"}, + {line, ?LINE}, {body, Why}]), + {{error, Why}, Count} + end; + not_found = Cause -> + {Cause, Count}; + {_, Cause} -> + error_logger:error_msg("~p,~p,~p,~p~n", + [{module, ?MODULE_STRING}, + {function, "handle_call/3"}, + {line, ?LINE}, {body, Cause}]), + {{error, Cause}, Count} + end, + {reply, Reply, State#state{count = Count_1}, ?DEF_TIMEOUT}; + +handle_call(status, _From, #state{consumer_status = ConsumerStatus, consumer_batch_of_msgs = BatchOfMsgs, - consumer_interval = Interval} = State) -> - MQDBMessageId = MQProps#mq_properties.mqdb_id, - Res = leo_backend_db_api:status(MQDBMessageId), - Count = lists:foldl(fun([{key_count, KC}, _], Acc) -> - Acc + KC; - (_, Acc) -> - Acc - end, 0, Res), + consumer_interval = Interval, + count = Count} = State) -> {reply, {ok, [{?MQ_CNS_PROP_NUM_OF_MSGS, Count}, {?MQ_CNS_PROP_STATUS, ConsumerStatus}, {?MQ_CNS_PROP_BATCH_OF_MSGS, BatchOfMsgs}, @@ -227,17 +263,12 @@ code_change(_OldVsn, State, _Extra) -> put_message(MsgKeyBin, MsgBin, #state{mq_properties = MQProps}) -> try BackendMessage = MQProps#mq_properties.mqdb_id, - case leo_backend_db_api:get(BackendMessage, MsgKeyBin) of - not_found -> - case leo_backend_db_api:put( - BackendMessage, MsgKeyBin, MsgBin) of - ok -> - ok; - Error -> - Error - end; - _Other -> - ok + case leo_backend_db_api:put( + BackendMessage, MsgKeyBin, MsgBin) of + ok -> + ok; + Error -> + Error end catch _ : Cause ->