Skip to content

Commit

Permalink
To avoid storing inconsistent objects during del-directory leo-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Aug 1, 2017
1 parent 9148c7e commit 6476436
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 42 deletions.
1 change: 1 addition & 0 deletions apps/leo_storage/include/leo_storage.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@
directory = <<>> :: binary(),
state = ?STATE_PENDING :: del_dir_state(),
is_notification_successful = false :: boolean(), %% for del-buclet (communication w/leo_manager)
enqueued_at = 1 :: pos_integer(),
timestamp = 1 :: pos_integer()
}).

Expand Down
34 changes: 18 additions & 16 deletions apps/leo_storage/src/leo_storage_handler_del_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ handle_call({enqueue, Type, Directory, IsClient},_From, #state{cached_items = Ca
directory = Directory,
type = Type,
state = ?STATE_PENDING,
timestamp = leo_date:now()}) of
enqueued_at = leo_date:clock(),
timestamp = leo_date:now()
}) of
ok when IsClient == false->
case run(Type, ?STATE_PENDING, null, Directory, State) of
{ok, State_1} ->
Expand All @@ -144,7 +146,7 @@ handle_call({enqueue, Type, Directory, IsClient},_From, #state{cached_items = Ca
{error, State}
end;
ok ->
?info("run/5", [{"msg: enqueued", Directory}]),
?info("handle_call/3 - enqueue", [{"msg: enqueued", Directory}]),
{ok, State};
{error,_Cause} ->
{{error, ?ERROR_ENQUEUE_FAILURE}, State}
Expand Down Expand Up @@ -290,11 +292,12 @@ dequeue_1(State, [{_, DelBucketStateBin}|DelBucketStateList], MQId) ->
case catch binary_to_term(DelBucketStateBin) of
#del_dir_state{type = Type,
directory = Directory,
state = ?STATE_PENDING} ->
state = ?STATE_PENDING,
enqueued_at = EnqueuedAt} ->
From = self(),
_Pid = spawn(
fun() ->
insert_messages(From, MQId, Type, Directory)
insert_messages(From, MQId, Type, Directory, EnqueuedAt)
end),
% receive 'enqueuing' message here to change the state from STATE_PENDING to STATE_ENQUEUING
% for preventing insert_messages from being invoked multiple times.
Expand All @@ -312,17 +315,18 @@ dequeue_1(State, [{_, DelBucketStateBin}|DelBucketStateList], MQId) ->


%% @private
-spec(insert_messages(From, MQId, Type, Directory) ->
-spec(insert_messages(From, MQId, Type, Directory, EnqueuedAt) ->
ok | {error, Cause} when From::pid(),
MQId::mq_id(),
Type::del_dir_type(),
Directory::binary(),
EnqueuedAt::pos_integer(),
Cause::any()).
insert_messages(From, MQId, Type, Directory) ->
insert_messages(From, MQId, Type, Directory, EnqueuedAt) ->
erlang:send(From, {enqueuing, MQId, Type, Directory}),

case leo_storage_handler_object:prefix_search_and_remove_objects(
MQId, Directory) of
MQId, Directory, EnqueuedAt) of
{ok,_TotalMsgs} ->
erlang:send(From, {enqueued, MQId, Type, Directory});
{error, Cause} ->
Expand Down Expand Up @@ -396,22 +400,20 @@ check_stats_1([_|DelBucketStateList], From, State) ->
check_stats_2(#del_dir_state{mq_id = MQId,
type = Type,
directory = Directory,
state = ?STATE_PENDING}, From,_State) when MQId /= null,
Type == ?TYPE_DEL_DIR ->
state = ?STATE_PENDING,
enqueued_at = EnqueuedAt} = CurState, From,_State) when MQId /= null,
Type == ?TYPE_DEL_DIR ->
case leo_mq_api:count(MQId) of
{ok, 0} ->
_Pid = spawn(
fun() ->
insert_messages(From, MQId, Type, Directory)
insert_messages(From, MQId, Type, Directory, EnqueuedAt)
end),
ok;
{ok,_Count} ->
update_state(#del_dir_state{
mq_id = MQId,
directory = Directory,
type = Type,
state = ?STATE_MONITORING,
timestamp = leo_date:now()});
update_state(CurState#del_dir_state{
state = ?STATE_MONITORING,
timestamp = leo_date:now()});
_ ->
ok
end;
Expand Down
11 changes: 6 additions & 5 deletions apps/leo_storage/src/leo_storage_handler_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
head/2, head/3,
head_with_calc_md5/3,
replicate/1, replicate/3,
prefix_search/3, prefix_search_and_remove_objects/2,
prefix_search/3, prefix_search_and_remove_objects/3,
find_uploaded_objects_by_key/1,
is_key_under_del_dir/1, can_compact_object/2
]).
Expand Down Expand Up @@ -1032,20 +1032,21 @@ prefix_search_2(ParentDir, Marker, Key, Meta, Acc) ->


%% @doc Retrieve object of deletion from object-storage by key
-spec(prefix_search_and_remove_objects(MQId, ParentDir) ->
-spec(prefix_search_and_remove_objects(MQId, ParentDir, EnqueuedAt) ->
{ok, [{Key, Value}]} |
not_found |
{error, Cause} when MQId::mq_id(),
ParentDir::binary(),
EnqueuedAt::pos_integer(),
Key::atom(),
Value::any(),
Cause::any()).
prefix_search_and_remove_objects(MQId, ParentDir) ->
prefix_search_and_remove_objects(MQId, ParentDir, EnqueuedAt) ->
Fun = fun(Key, V, Acc) ->
PreMeta = binary_to_term(V),
case leo_object_storage_transformer:transform_metadata(PreMeta) of
{error, Cause} ->
?error("prefix_search_and_remove_objects/1",
?error("prefix_search_and_remove_objects/3",
[{key, Key}, {error, Cause}]),
Acc;
Metadata ->
Expand All @@ -1059,7 +1060,7 @@ prefix_search_and_remove_objects(MQId, ParentDir) ->
case (Pos_1 == 0) of
true when Metadata#?METADATA.del == ?DEL_FALSE ->
case leo_storage_mq:publish(
{?QUEUE_ID_DEL_DIR, MQId}, AddrId, Key) of
{?QUEUE_ID_DEL_DIR, MQId}, AddrId, Key, EnqueuedAt) of
ok ->
void;
{error, Cause} ->
Expand Down
40 changes: 22 additions & 18 deletions apps/leo_storage/src/leo_storage_mq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,6 @@ publish(?QUEUE_ID_REQ_DEL_DIR = Id, Node, Directory) ->
dir = Directory,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MsgBin);

publish({?QUEUE_ID_DEL_DIR, WorkerId}, AddrId, Key) ->
KeyBin = term_to_binary({AddrId, Key}),
MsgBin = term_to_binary(#async_deletion_message{
id = leo_date:clock(),
addr_id = AddrId,
key = Key,
timestamp = leo_date:now()}),
leo_mq_api:publish(WorkerId, KeyBin, MsgBin);

publish(_,_,_) ->
{error, badarg}.

Expand Down Expand Up @@ -225,6 +215,20 @@ publish(?QUEUE_ID_SYNC_OBJ_WITH_DC = Id, ClusterId, AddrId, Key) ->
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);

publish({?QUEUE_ID_DEL_DIR, WorkerId}, AddrId, Key, EnqueuedAt) ->
KeyBin = term_to_binary({AddrId, Key}),
MsgBin = term_to_binary(#?MSG_ASYNC_DELETION{
id = leo_date:clock(),
addr_id = AddrId,
key = Key,
%% To compare it with its latest metadata
%% See `compare_between_metadatas/1`
meta = #?METADATA{addr_id = AddrId,
key = Key,
clock = EnqueuedAt},
timestamp = leo_date:now()}),

leo_mq_api:publish(WorkerId, KeyBin, MsgBin);
publish(_,_,_,_) ->
{error, badarg}.

Expand Down Expand Up @@ -323,7 +327,7 @@ handle_call({consume, ?QUEUE_ID_PER_OBJECT, MessageBin}) ->
meta = Metadata,
sync_node = SyncNode,
is_force_sync = IsForceSync}} ->
case compare_metadatas(Metadata) of
case compare_between_metadatas(Metadata) of
true when IsForceSync == true,
SyncNode /= undefined ->
send_object_to_remote_node(SyncNode, AddrId, Key);
Expand Down Expand Up @@ -996,7 +1000,7 @@ remove_objects_under_dir(MessageBin) ->
{ok, #?MSG_ASYNC_DELETION{addr_id = AddrId,
key = Key,
meta = Metadata}} ->
case compare_metadatas(Metadata) of
case compare_between_metadatas(Metadata) of
true ->
case catch leo_storage_handler_object:delete(
#?OBJECT{addr_id = AddrId,
Expand Down Expand Up @@ -1026,15 +1030,15 @@ remove_objects_under_dir(MessageBin) ->


%% @doc Compare MQ's metadata with its latest metadata
-spec(compare_metadatas(Metadata) ->
-spec(compare_between_metadatas(Metadata) ->
Result | {error, Cause} when Metadata::#?METADATA{},
Result::boolean(),
Cause::any()).
compare_metadatas(undefined) ->
compare_between_metadatas(undefined) ->
true;
compare_metadatas(#?METADATA{addr_id = AddrId,
key = Key,
clock = MsgClock}) ->
compare_between_metadatas(#?METADATA{addr_id = AddrId,
key = Key,
clock = MsgClock}) ->
case leo_storage_handler_object:head(AddrId, Key, false) of
{ok, #?METADATA{clock = Clock}} when MsgClock > Clock ->
true;
Expand All @@ -1043,7 +1047,7 @@ compare_metadatas(#?METADATA{addr_id = AddrId,
not_found ->
false;
{error, Cause} ->
?error("compare_metadatas/1",
?error("compare_between_metadatas/1",
[{addr_id, AddrId}, {key, Key},
{cause, Cause}]),
{error, ?ERROR_COULD_NOT_GET_META}
Expand Down
4 changes: 3 additions & 1 deletion apps/leo_storage/test/leo_storage_handler_object_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,10 @@ prefix_search_and_remove_objects_(_) ->

meck:new(leo_mq_api, [non_strict]),
meck:expect(leo_mq_api, publish, fun(_,_,_) -> ok end),
meck:expect(leo_mq_api, publish, fun(_,_,_,_) -> ok end),

Res = leo_storage_handler_object:prefix_search_and_remove_objects(mqid, ?TEST_BUCKET),
Res = leo_storage_handler_object:prefix_search_and_remove_objects(
mqid, ?TEST_BUCKET, leo_date:clock()),
?assertEqual(true, is_list(Res)),
ok.

Expand Down
2 changes: 1 addition & 1 deletion apps/leo_storage/test/leo_storage_mq_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ publish_({_, Test1Node}) ->
ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, ?TEST_VNODE_ID, ?TEST_KEY_1),
ok = leo_storage_mq:publish(?QUEUE_ID_COMP_META_WITH_DC, 'leofs_1', [{?TEST_VNODE_ID, ?TEST_KEY_1}]),
ok = leo_storage_mq:publish(?QUEUE_ID_REQ_DEL_DIR, node(), <<"leofs/">>),
ok = leo_storage_mq:publish({?QUEUE_ID_DEL_DIR, 'worker_1'}, ?TEST_VNODE_ID, ?TEST_KEY_1),
ok = leo_storage_mq:publish({?QUEUE_ID_DEL_DIR, 'worker_1'}, ?TEST_VNODE_ID, ?TEST_KEY_1, leo_date:clock()),
ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, 'leofs_1', ?TEST_VNODE_ID, ?TEST_KEY_1),

ok = leo_storage_mq:publish(
Expand Down
2 changes: 1 addition & 1 deletion apps/leo_storage/test/leo_storage_replicator_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ gen_mock_2(object, {_Test0Node, _Test1Node}, Case) ->
meck:new(leo_storage_mq, [non_strict]),
meck:expect(leo_storage_mq, publish,
fun(Type, #?METADATA{addr_id = AddrId,
key = Key} = Metadata, _ErrorType) ->
key = Key}, _ErrorType) ->
?assertEqual(?QUEUE_ID_PER_OBJECT, Type),
?assertEqual(?TEST_RING_ID_1, AddrId),
?assertEqual(?TEST_KEY_1, Key),
Expand Down

0 comments on commit 6476436

Please sign in to comment.