Skip to content

Commit

Permalink
Fixes object consistency during consumption MQ's message for /issues/783
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara authored and mocchira committed Jul 25, 2017
1 parent c3cdeea commit 4fc92d3
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 96 deletions.
29 changes: 29 additions & 0 deletions apps/leo_storage/include/leo_storage.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,35 @@
key :: any(),
timestamp = 0 :: non_neg_integer(),
times = 0 :: non_neg_integer()}).
-record(async_deletion_message_1, {
id = 0 :: non_neg_integer(),
addr_id = 0 :: non_neg_integer(),
key :: any(),
meta :: term(),
timestamp = 0 :: non_neg_integer(),
times = 0 :: non_neg_integer()}).
-define(MSG_ASYNC_DELETION, 'async_deletion_message_1').
-define(transform_async_deletion_message(_Msg),
begin
case _Msg of
#async_deletion_message{id = _Id,
addr_id = _AddrId,
key = _Key,
timestamp = _Timestamp,
times = _Time} ->
{ok, #async_deletion_message_1{
id = _Id,
addr_id = _AddrId,
key = _Key,
timestamp = _Timestamp,
times = _Time}};
#async_deletion_message_1{} ->
{ok,_Msg};
_ ->
{error, invalid_record}
end
end).


-record(recovery_node_message, {
id = 0 :: non_neg_integer(),
Expand Down
231 changes: 169 additions & 62 deletions apps/leo_storage/src/leo_storage_mq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,34 @@ publish(?QUEUE_ID_RECOVERY_NODE = Id, Node) ->
node = Node,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MsgBin);

publish(?QUEUE_ID_ASYNC_DELETION = Id, #?METADATA{addr_id = AddrId,
key = Key} = Metadata) ->
KeyBin = term_to_binary({AddrId, Key}),
MessageBin = term_to_binary(
#?MSG_ASYNC_DELETION{id = leo_date:clock(),
addr_id = AddrId,
key = Key,
meta = Metadata,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);
publish(_,_) ->
{error, badarg}.

-spec(publish(mq_id(), any(), any()) ->
ok | {error, any()}).
publish(?QUEUE_ID_PER_OBJECT = Id, #?METADATA{addr_id = AddrId,
key = Key}= Metadata, ErrorType) ->
KeyBin = term_to_binary({ErrorType, Key}),
MessageBin = term_to_binary(
#?MSG_INCONSISTENT_DATA{id = leo_date:clock(),
type = ErrorType,
addr_id = AddrId,
key = Key,
meta = Metadata,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);

publish(?QUEUE_ID_SYNC_BY_VNODE_ID = Id, VNodeId, Node) ->
KeyBin = term_to_binary({VNodeId, Node}),
MessageBin = term_to_binary(
Expand All @@ -131,13 +154,17 @@ publish(?QUEUE_ID_SYNC_BY_VNODE_ID = Id, VNodeId, Node) ->
leo_mq_api:publish(Id, KeyBin, MessageBin);

publish(?QUEUE_ID_ASYNC_DELETION = Id, AddrId, Key) ->
KeyBin = term_to_binary({AddrId, Key}),
MessageBin = term_to_binary(
#async_deletion_message{id = leo_date:clock(),
addr_id = AddrId,
key = Key,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);
case leo_storage_handler_object:head(AddrId, Key, false) of
{ok, Metadata} ->
publish(Id, Metadata);
not_found ->
ok;
{error, Cause} ->
?error("publish/3",
[{addr_id, AddrId}, {key, Key},
{cause, Cause}]),
{error, ?ERROR_COULD_NOT_GET_META}
end;

publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, AddrId, Key) ->
publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, undefined, AddrId, Key);
Expand All @@ -158,7 +185,6 @@ publish(?QUEUE_ID_REQ_DEL_DIR = Id, Node, Directory) ->
node = Node,
dir = Directory,
timestamp = leo_date:now()}),
?debug("leo_mq delete directory", [{dir, Directory}]),
leo_mq_api:publish(Id, KeyBin, MsgBin);

publish({?QUEUE_ID_DEL_DIR, WorkerId}, AddrId, Key) ->
Expand All @@ -176,14 +202,17 @@ publish(_,_,_) ->
-spec(publish(mq_id(), any(), any(), any()) ->
ok | {error, any()}).
publish(?QUEUE_ID_PER_OBJECT = Id, AddrId, Key, ErrorType) ->
KeyBin = term_to_binary({ErrorType, Key}),
MessageBin = term_to_binary(
#?MSG_INCONSISTENT_DATA{id = leo_date:clock(),
type = ErrorType,
addr_id = AddrId,
key = Key,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);
case leo_storage_handler_object:head(AddrId, Key, false) of
{ok, Metadata} ->
publish(Id, Metadata, ErrorType);
not_found ->
ok;
{error, Cause} ->
?error("publish/4",
[{addr_id, AddrId}, {key, Key},
{cause, Cause}]),
{error, ?ERROR_COULD_NOT_GET_META}
end;

publish(?QUEUE_ID_SYNC_OBJ_WITH_DC = Id, ClusterId, AddrId, Key) ->
KeyBin = term_to_binary({ClusterId, AddrId, Key}),
Expand Down Expand Up @@ -242,16 +271,27 @@ publish(_,_,_,_,_) ->
{error, badarg}.

publish(?QUEUE_ID_PER_OBJECT = Id, AddrId, Key, SyncNode, IsForceSync, ErrorType) ->
KeyBin = term_to_binary({ErrorType, Key}),
MessageBin = term_to_binary(
#?MSG_INCONSISTENT_DATA{id = leo_date:clock(),
type = ErrorType,
addr_id = AddrId,
key = Key,
sync_node = SyncNode,
is_force_sync = IsForceSync,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);
case leo_storage_handler_object:head(AddrId, Key, false) of
{ok, Metadata} ->
KeyBin = term_to_binary({ErrorType, Key}),
MessageBin = term_to_binary(
#?MSG_INCONSISTENT_DATA{id = leo_date:clock(),
type = ErrorType,
addr_id = AddrId,
key = Key,
meta = Metadata,
sync_node = SyncNode,
is_force_sync = IsForceSync,
timestamp = leo_date:now()}),
leo_mq_api:publish(Id, KeyBin, MessageBin);
not_found ->
ok;
{error, Cause} ->
?error("publish/4",
[{addr_id, AddrId}, {key, Key},
{cause, Cause}]),
{error, ?ERROR_COULD_NOT_GET_META}
end;
publish(_,_,_,_,_,_) ->
{error, badarg}.

Expand Down Expand Up @@ -280,25 +320,32 @@ handle_call({consume, ?QUEUE_ID_PER_OBJECT, MessageBin}) ->
case ?transform_inconsistent_data_message(Term) of
{ok, #?MSG_INCONSISTENT_DATA{addr_id = AddrId,
key = Key,
meta = Metadata,
sync_node = SyncNode,
is_force_sync = true}} when SyncNode /= undefined ->
send_object_to_remote_node(SyncNode, AddrId, Key);
{ok, #?MSG_INCONSISTENT_DATA{addr_id = AddrId,
key = Key,
type = _ErrorType}} ->
case correct_redundancies(Key) of
ok ->
ok;
{error, Cause = not_found} ->
?warn("handle_call/1 - consume",
[{qid, ?QUEUE_ID_PER_OBJECT},
{addr_id, AddrId},
{key, Key}, {cause, Cause}]),
is_force_sync = IsForceSync}} ->
case compare_metadatas(Metadata) of
true when IsForceSync == true,
SyncNode /= undefined ->
send_object_to_remote_node(SyncNode, AddrId, Key);
true ->
case correct_redundancies(Key) of
ok ->
ok;
{error, Cause = not_found} ->
?warn("handle_call/1 - consume",
[{qid, ?QUEUE_ID_PER_OBJECT},
{addr_id, AddrId},
{key, Key}, {cause, Cause}]),
ok;
{error, Cause} ->
?debug("handle_call/1 - consume",
[{addr_id, AddrId},
{key, Key}, {cause, Cause}]),
{error, Cause}
end;
false ->
ok;
{error, Cause} ->
?debug("handle_call/1 - consume",
[{addr_id, AddrId},
{key, Key}, {cause, Cause}]),
{error, Cause}
end;
{error,_Error} ->
Expand Down Expand Up @@ -497,6 +544,11 @@ recover_node_callback_2([SrcNode|Rest], AddrId, Key, FixedNode) ->

%% @doc Send object to a remote-node
%% @private
-spec(send_object_to_remote_node(Node, AddrId, Key) ->
ok | {error, Cause} when Node::node(),
AddrId::non_neg_integer(),
Key::binary(),
Cause::any()).
send_object_to_remote_node(Node, AddrId, Key) ->
Ref = make_ref(),
case leo_storage_handler_object:get({Ref, Key}) of
Expand Down Expand Up @@ -575,7 +627,6 @@ sync_vnodes_callback(Node, FromAddrId, ToAddrId)->

%% @doc Remove a node from redundancies
%% @private

-spec(delete_node_from_redundancies(Redundancies, Node, AccRedundancies) ->
{ok, AccRedundancies} when Redundancies::[#redundant_node{}],
Node::node(),
Expand Down Expand Up @@ -842,6 +893,10 @@ rebalance_2({ok, Redundancies}, #rebalance_message{node = Node,

%% @doc Retrieve redundancies with a number of replicas
%% @private
-spec(get_redundancies_with_replicas(AddrId, Key, Redundancies) ->
Redundancies when AddrId::non_neg_integer(),
Key::binary(),
Redundancies::[#redundancies{}]).
get_redundancies_with_replicas(AddrId, Key, Redundancies) ->
%% Retrieve redundancies with a number of replicas
case leo_object_storage_api:head({AddrId, Key}) of
Expand Down Expand Up @@ -897,6 +952,9 @@ notify_rebalance_message_to_manager(VNodeId) ->

%% @doc Fix consistency of an object between a local-cluster and remote-cluster(s)
%% @private
-spec(fix_consistency_between_clusters(InconsistentData) ->
ok | {error, Cause} when InconsistentData::#inconsistent_data_with_dc{},
Cause::any()).
fix_consistency_between_clusters(#inconsistent_data_with_dc{
addr_id = AddrId,
key = Key,
Expand All @@ -923,34 +981,77 @@ fix_consistency_between_clusters(#inconsistent_data_with_dc{


%% @doc Remove objects under the directory/bucket
-spec(remove_objects_under_dir(MessageBin) ->
ok | {error, Cause} when MessageBin::binary(),
Cause::any()).
remove_objects_under_dir(MessageBin) ->
case catch binary_to_term(MessageBin) of
#async_deletion_message{addr_id = AddrId,
key = Key} ->
case catch leo_storage_handler_object:delete(
#?OBJECT{addr_id = AddrId,
key = Key,
clock = leo_date:clock(),
timestamp = leo_date:now(),
del = ?DEL_TRUE
}, 0, false, leo_mq) of
ok ->
ok;
{error, not_found} ->
ok;
{_, Cause} ->
{error, Cause}
end;
_ ->
{'EXIT',_Cause} ->
?warn("remove_objects_under_dir/1",
[{qid, ?QUEUE_ID_DEL_DIR},
{cause, invalid_data_format}]),
ok
ok;
AsyncDeletionMsg ->
case ?transform_async_deletion_message(AsyncDeletionMsg) of
{ok, #?MSG_ASYNC_DELETION{addr_id = AddrId,
key = Key,
meta = Metadata}} ->
case compare_metadatas(Metadata) of
true ->
case catch leo_storage_handler_object:delete(
#?OBJECT{addr_id = AddrId,
key = Key,
clock = leo_date:clock(),
timestamp = leo_date:now(),
del = ?DEL_TRUE}, 0, false, leo_mq) of
ok ->
ok;
{error, not_found} ->
ok;
{_, Cause} ->
{error, Cause}
end;
false ->
ok;
{error, Cause} ->
{error, Cause}
end;
_ ->
?warn("remove_objects_under_dir/1",
[{qid, ?QUEUE_ID_DEL_DIR},
{cause, invalid_data_format}]),
ok
end
end.


%% @doc Compare MQ's metadata with its latest metadata
-spec(compare_metadatas(Metadata) ->
Result | {error, Cause} when Metadata::#?METADATA{},
Result::boolean(),
Cause::any()).
compare_metadatas(undefined) ->
true;
compare_metadatas(#?METADATA{addr_id = AddrId,
key = Key,
clock = MsgClock}) ->
case leo_storage_handler_object:head(AddrId, Key, false) of
{ok, #?METADATA{clock = Clock}} when MsgClock > Clock ->
true;
{ok, _} ->
false;
not_found ->
false;
{error, Cause} ->
?error("compare_metadatas/1",
[{addr_id, AddrId}, {key, Key},
{cause, Cause}]),
{error, ?ERROR_COULD_NOT_GET_META}
end.


%%--------------------------------------------------------------------
%% INNTERNAL FUNCTIONS-2
%% INNTERNAL FUNCTIONS-2 ETS related functions
%%--------------------------------------------------------------------
%% @doc Lookup rebalance counter
%% @private
Expand All @@ -972,12 +1073,18 @@ ets_lookup(Table, Key) ->

%% @doc Increment rebalance counter
%% @private
-spec(increment_counter(Table, Key) ->
ok when Table::atom(),
Key::binary()).
increment_counter(Table, Key) ->
catch ets:update_counter(Table, Key, 1),
ok.

%% @doc Decrement rebalance counter
%% @private
-spec(decrement_counter(Table, Key) ->
ok when Table::atom(),
Key::binary()).
decrement_counter(Table, Key) ->
catch ets:update_counter(Table, Key, -1),
ok.
Loading

0 comments on commit 4fc92d3

Please sign in to comment.