diff --git a/src/leo_mq_publisher.erl b/src/leo_mq_publisher.erl index 840b03a..d214b44 100644 --- a/src/leo_mq_publisher.erl +++ b/src/leo_mq_publisher.erl @@ -58,8 +58,9 @@ mq_properties = #mq_properties{} :: #mq_properties{}, count = 0 :: non_neg_integer(), consumer_status = ?ST_IDLING :: state_of_mq(), - consumer_batch_of_msgs = 0 :: non_neg_integer(), - consumer_interval = 0 :: non_neg_integer() + consumer_batch_of_msgs = 0 :: non_neg_integer(), + consumer_interval = 0 :: non_neg_integer(), + state_filepath = [] :: string() }). @@ -133,17 +134,25 @@ close(Id) -> init([Id, #mq_properties{db_name = DBName, db_procs = DBProcs, mqdb_id = MQDBMessageId, - mqdb_path = MQDBMessagePath} = MQProps]) -> + mqdb_path = MQDBMessagePath + } = MQProps]) -> case application:get_env(leo_mq, backend_db_sup_ref) of {ok, Pid} -> ok = leo_backend_db_sup:start_child( Pid, MQDBMessageId, DBProcs, DBName, MQDBMessagePath), - %% @TODO: Retrieve total num of message from the local mq's file - Count = 0, + %% Retrieve total num of message from the local state file + StateFilePath = lists:append([MQDBMessagePath, "_", atom_to_list(Id)]), + Count = case file:consult(StateFilePath) of + {ok, Props} -> + leo_misc:get_value('count', Props, 0); + _ -> + 0 + end, {ok, #state{id = Id, mq_properties = MQProps, - count = Count}, ?DEF_TIMEOUT}; + count = Count, + state_filepath = StateFilePath}, ?DEF_TIMEOUT}; _Error -> {stop, 'not_initialized'} end. @@ -203,9 +212,11 @@ handle_call(status, _From, #state{consumer_status = ConsumerStatus, {?MQ_CNS_PROP_INTERVAL, Interval} ]}, State, ?DEF_TIMEOUT}; -handle_call(close, _From, #state{mq_properties = MQProps} = State) -> +handle_call(close, _From, #state{count = Count, + state_filepath = StateFilePath, + mq_properties = MQProps} = State) -> MQDBMessageId = MQProps#mq_properties.mqdb_id, - ok = close_db(MQDBMessageId), + ok = close_db(MQDBMessageId, Count, StateFilePath), {reply, ok, State, ?DEF_TIMEOUT}; handle_call(stop, _From, State) -> @@ -236,12 +247,15 @@ handle_info(_Info, State) -> %% gen_server callback - Module:terminate(Reason, State) %%

terminate(_Reason, #state{id = Id, + count = Count, + state_filepath = StateFilePath, mq_properties = MQProps}) -> error_logger:info_msg("~p,~p,~p,~p~n", [{module, ?MODULE_STRING}, {function, "terminate/1"}, {line, ?LINE}, {body, Id}]), + %% Close the backend_db MQDBMessageId = MQProps#mq_properties.mqdb_id, - ok = close_db(MQDBMessageId), + ok = close_db(MQDBMessageId, Count, StateFilePath), ok. @@ -282,9 +296,16 @@ put_message(MsgKeyBin, MsgBin, #state{mq_properties = MQProps}) -> %% @doc Close a db %% @private --spec(close_db(atom()) -> - ok). -close_db(InstanseName) -> +-spec(close_db(InstanceName, Count, StateFilePath) -> + ok when InstanceName::atom(), + Count::non_neg_integer(), + StateFilePath::string()). +close_db(InstanseName, Count, StateFilePath) -> + %% Output the current state + _ = leo_file:file_unconsult(StateFilePath, [{id, InstanseName}, + {count, Count} + ]), + %% Close the backend-db case whereis(leo_backend_db_sup) of Pid when is_pid(Pid) -> List = supervisor:which_children(Pid),