Skip to content

Commit

Permalink
To change the backend-db from bitcask to leveldb#2 for fixing leo-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Dec 30, 2015
1 parent b791f5f commit d6f0f20
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions src/leo_mq_publisher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@
mq_properties = #mq_properties{} :: #mq_properties{},
count = 0 :: non_neg_integer(),
consumer_status = ?ST_IDLING :: state_of_mq(),
consumer_batch_of_msgs = 0 :: non_neg_integer(),
consumer_interval = 0 :: non_neg_integer()
consumer_batch_of_msgs = 0 :: non_neg_integer(),
consumer_interval = 0 :: non_neg_integer(),
state_filepath = [] :: string()
}).


Expand Down Expand Up @@ -133,17 +134,25 @@ close(Id) ->
init([Id, #mq_properties{db_name = DBName,
db_procs = DBProcs,
mqdb_id = MQDBMessageId,
mqdb_path = MQDBMessagePath} = MQProps]) ->
mqdb_path = MQDBMessagePath
} = MQProps]) ->
case application:get_env(leo_mq, backend_db_sup_ref) of
{ok, Pid} ->
ok = leo_backend_db_sup:start_child(
Pid, MQDBMessageId,
DBProcs, DBName, MQDBMessagePath),
%% @TODO: Retrieve total num of message from the local mq's file
Count = 0,
%% Retrieve total num of message from the local state file
StateFilePath = lists:append([MQDBMessagePath, "_", atom_to_list(Id)]),
Count = case file:consult(StateFilePath) of
{ok, Props} ->
leo_misc:get_value('count', Props, 0);
_ ->
0
end,
{ok, #state{id = Id,
mq_properties = MQProps,
count = Count}, ?DEF_TIMEOUT};
count = Count,
state_filepath = StateFilePath}, ?DEF_TIMEOUT};
_Error ->
{stop, 'not_initialized'}
end.
Expand Down Expand Up @@ -203,9 +212,11 @@ handle_call(status, _From, #state{consumer_status = ConsumerStatus,
{?MQ_CNS_PROP_INTERVAL, Interval}
]}, State, ?DEF_TIMEOUT};

handle_call(close, _From, #state{mq_properties = MQProps} = State) ->
handle_call(close, _From, #state{count = Count,
state_filepath = StateFilePath,
mq_properties = MQProps} = State) ->
MQDBMessageId = MQProps#mq_properties.mqdb_id,
ok = close_db(MQDBMessageId),
ok = close_db(MQDBMessageId, Count, StateFilePath),
{reply, ok, State, ?DEF_TIMEOUT};

handle_call(stop, _From, State) ->
Expand Down Expand Up @@ -236,12 +247,15 @@ handle_info(_Info, State) ->
%% gen_server callback - Module:terminate(Reason, State)
%% </p>
terminate(_Reason, #state{id = Id,
count = Count,
state_filepath = StateFilePath,
mq_properties = MQProps}) ->
error_logger:info_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING}, {function, "terminate/1"},
{line, ?LINE}, {body, Id}]),
%% Close the backend_db
MQDBMessageId = MQProps#mq_properties.mqdb_id,
ok = close_db(MQDBMessageId),
ok = close_db(MQDBMessageId, Count, StateFilePath),
ok.


Expand Down Expand Up @@ -282,9 +296,16 @@ put_message(MsgKeyBin, MsgBin, #state{mq_properties = MQProps}) ->

%% @doc Close a db
%% @private
-spec(close_db(atom()) ->
ok).
close_db(InstanseName) ->
-spec(close_db(InstanceName, Count, StateFilePath) ->
ok when InstanceName::atom(),
Count::non_neg_integer(),
StateFilePath::string()).
close_db(InstanseName, Count, StateFilePath) ->
%% Output the current state
_ = leo_file:file_unconsult(StateFilePath, [{id, InstanseName},
{count, Count}
]),
%% Close the backend-db
case whereis(leo_backend_db_sup) of
Pid when is_pid(Pid) ->
List = supervisor:which_children(Pid),
Expand Down

1 comment on commit d6f0f20

@yosukehara
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Example:
## leo_stroage adopted leveldb for the mq's backend-db
./package/leo_storage/work/queue/
├── [ 646 Dec 31  0:35]  1
│   ├── [ 126 Dec 31  0:34]  message0 -> /Users/yosukehara/dev/leo-project/test/leofs-1.4-test/leofs-1.4.0-pre.3/package/leo_storage/work/queue/1/message0_63618708860/
│   ├── [ 476 Dec 31  0:34]  message0_63618708860
│   │   ├── [   0 Dec 31  0:35]  000003.log
│   │   ├── [  16 Dec 31  0:34]  CURRENT
│   │   ├── [   0 Dec 31  0:34]  LOCK
│   │   ├── [2.3K Dec 31  0:34]  LOG
│   │   ├── [  50 Dec 31  0:35]  MANIFEST-000002
│   │   ├── [  68 Dec 31  0:34]  sst_0
│   │   ├── [  68 Dec 31  0:34]  sst_1
│   │   ├── [  68 Dec 31  0:34]  sst_2
│   │   ├── [  68 Dec 31  0:34]  sst_3
│   │   ├── [  68 Dec 31  0:34]  sst_4
│   │   ├── [  68 Dec 31  0:34]  sst_5
│   │   └── [  68 Dec 31  0:34]  sst_6
.   .
.   .
.   .
│   ├── [ 126 Dec 31  0:34]  message7 -> /Users/yosukehara/dev/leo-project/test/leofs-1.4-test/leofs-1.4.0-pre.3/package/leo_storage/work/queue/1/message7_63618708859/
│   ├── [ 476 Dec 31  0:34]  message7_63618708859
│   │   ├── [   0 Dec 31  0:35]  000003.log
│   │   ├── [  16 Dec 31  0:34]  CURRENT
│   │   ├── [   0 Dec 31  0:34]  LOCK
│   │   ├── [2.3K Dec 31  0:34]  LOG
│   │   ├── [  50 Dec 31  0:35]  MANIFEST-000002
│   │   ├── [  68 Dec 31  0:34]  sst_0
│   │   ├── [  68 Dec 31  0:34]  sst_1
│   │   ├── [  68 Dec 31  0:34]  sst_2
│   │   ├── [  68 Dec 31  0:34]  sst_3
│   │   ├── [  68 Dec 31  0:34]  sst_4
│   │   ├── [  68 Dec 31  0:34]  sst_5
│   │   └── [  68 Dec 31  0:34]  sst_6
│   └── [  46 Dec 31  0:35]  message_leo_per_object_queue ## ma's state file

Please sign in to comment.