Skip to content

Commit

Permalink
storage: Fix for leo-project#546
Browse files Browse the repository at this point in the history
  • Loading branch information
mocchira committed Dec 22, 2017
1 parent cfa5ad0 commit a3a3b15
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 27 deletions.
7 changes: 4 additions & 3 deletions apps/leo_storage/src/leo_storage_mq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -771,11 +771,12 @@ correct_redundancies_1(Key, AddrId, [#redundant_node{node = Node}|T], Metadatas,
{ok, #member{state = ?STATE_RUNNING}} ->
%% Retrieve a metadata from remote-node
%% invoke head with NO retry option
RPCKey = rpc:async_call(Node, leo_storage_handler_object,
head, [AddrId, Key, false]),
RPCKey = rpc:async_call(Node, leo_object_storage_api,
head_with_check_avs, [{AddrId, Key}, check_header]),

case rpc:nb_yield(RPCKey, ?DEF_REQ_TIMEOUT) of
{value, {ok, Metadata}} ->
{value, {ok, MetaBin}} ->
Metadata = binary_to_term(MetaBin),
correct_redundancies_1(Key, AddrId, T,
[{Node, Metadata}|Metadatas], ErrorNodes);
_Error ->
Expand Down
57 changes: 33 additions & 24 deletions apps/leo_storage/src/leo_sync_local_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -251,32 +251,41 @@ slice_and_replicate(Objects, Errors) ->
slice_and_replicate_1(#?METADATA{addr_id = AddrId,
key = Key,
clock = Clock} = Metadata, Object, StackedObject, Errors) ->
case leo_storage_handler_object:head(AddrId, Key, false) of
{ok, #?METADATA{clock = Clock_1}} when Clock == Clock_1 ->
slice_and_replicate(StackedObject, Errors);
{ok, #?METADATA{clock = Clock_1} = Metadata} when Clock < Clock_1 ->
ok = leo_storage_mq:publish(?QUEUE_ID_PER_OBJECT, Metadata, ?ERR_TYPE_REPLICATE_DATA),
slice_and_replicate(StackedObject, Errors);
_ ->
case leo_misc:get_env(leo_redundant_manager, ?PROP_RING_HASH) of
{ok, RingHashCur} ->
case leo_object_storage_api:store(
Metadata#?METADATA{ring_hash = RingHashCur},
Object) of
ok ->
slice_and_replicate(StackedObject, Errors);
{error, Cause} ->
?warn("slice_and_replicate_1/4",
[{key, binary_to_list(Metadata#?METADATA.key)},
{cause, Cause}]),
slice_and_replicate(StackedObject, [Metadata|Errors])
end;
case leo_object_storage_api:head_with_check_avs({AddrId, Key}, check_header) of
{ok, MetaBin} ->
LocalMeta = binary_to_term(MetaBin),
case LocalMeta#?METADATA.clock of
Clock ->
slice_and_replicate(StackedObject, Errors);
Clock_1 when Clock < Clock_1 ->
ok = leo_storage_mq:publish(?QUEUE_ID_PER_OBJECT, Metadata, ?ERR_TYPE_REPLICATE_DATA),
slice_and_replicate(StackedObject, Errors);
_ ->
?warn("slice_and_replicate_1/4",
[{key, binary_to_list(Metadata#?METADATA.key)},
{cause, "Current ring-hash is not found"}]),
slice_and_replicate_2(Metadata, Object, StackedObject, Errors)
end;
_ ->
slice_and_replicate_2(Metadata, Object, StackedObject, Errors)
end.

slice_and_replicate_2(#?METADATA{key = Key} = Metadata, Object, StackedObject, Errors) ->
case leo_misc:get_env(leo_redundant_manager, ?PROP_RING_HASH) of
{ok, RingHashCur} ->
case leo_object_storage_api:store(
Metadata#?METADATA{ring_hash = RingHashCur},
Object) of
ok ->
slice_and_replicate(StackedObject, Errors);
{error, Cause} ->
?warn("slice_and_replicate_2/4",
[{key, binary_to_list(Key)},
{cause, Cause}]),
slice_and_replicate(StackedObject, [Metadata|Errors])
end
end;
_ ->
?warn("slice_and_replicate_2/4",
[{key, binary_to_list(Key)},
{cause, "Current ring-hash is not found"}]),
slice_and_replicate(StackedObject, [Metadata|Errors])
end.

%% @private
Expand Down
9 changes: 9 additions & 0 deletions apps/leo_storage/test/leo_storage_mq_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ subscribe_1_({Test0Node, Test1Node}) ->
fun(_InconsistentNodes, _CorrectMetadata) ->
ok
end]),
ok = rpc:call(Test1Node, meck, new, [leo_object_storage_api, [no_link, non_strict]]),
ok = rpc:call(Test1Node, meck, expect, [leo_object_storage_api, head_with_check_avs,
fun(_AddrIdAndKey, _CheckMethod) ->
{error, invalid_object}
end]),
meck:expect(leo_redundant_manager_api, get_member_by_node,
fun(_Node) ->
{ok, #member{state = ?STATE_RUNNING}}
Expand All @@ -283,6 +288,10 @@ subscribe_1_({Test0Node, Test1Node}) ->
fun({_AddrId, _Key}) ->
{ok, term_to_binary(#?METADATA{num_of_replicas = 0})}
end),
meck:expect(leo_object_storage_api, head_with_check_avs,
fun({_AddrId, _Key}, _CheckMethod) ->
{ok, term_to_binary(#?METADATA{num_of_replicas = 0})}
end),

timer:sleep(100),
leo_storage_mq:handle_call({consume, ?QUEUE_ID_PER_OBJECT, ?TEST_MSG_1}),
Expand Down

0 comments on commit a3a3b15

Please sign in to comment.