Skip to content

Commit

Permalink
storage: Fix for leo-project#553
Browse files Browse the repository at this point in the history
  • Loading branch information
windkit committed Dec 6, 2017
1 parent b4d2ecc commit a24b423
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 7 deletions.
31 changes: 24 additions & 7 deletions apps/leo_storage/src/leo_storage_handler_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1180,9 +1180,14 @@ get_active_redundancies(_, []) ->
get_active_redundancies(Quorum, Redundancies) ->
AvailableNodes = [RedundantNode ||
#redundant_node{available = true} = RedundantNode <- Redundancies],
HasUnavailableNodes = lists:any(fun(#redundant_node{available = false}) ->
true;
(_) ->
false;
end, Redundancies),
case (Quorum =< erlang:length(AvailableNodes)) of
true ->
{ok, AvailableNodes};
{ok, AvailableNodes, HasUnavailableNodes};
false ->
{error, ?ERROR_NOT_SATISFY_QUORUM}
end.
Expand All @@ -1200,19 +1205,31 @@ get_active_redundancies(Quorum, Redundancies) ->
Cause::any()).
read_and_repair(#read_parameter{quorum = Q} = ReadParams, Redundancies) ->
case get_active_redundancies(Q, Redundancies) of
{ok, AvailableNodes} ->
read_and_repair_1(ReadParams, AvailableNodes, AvailableNodes, []);
{ok, AvailableNodes, HasUnavailableNodes} ->
read_and_repair_1(ReadParams, AvailableNodes, AvailableNodes, HasUnavailableNodes, []);
Error ->
Error
end.

%% @private
read_and_repair_1(_,[],_,[Error|_]) ->
{error, Error};
read_and_repair_1(ReadParams, [Node|Rest], AvailableNodes, Errors) ->
read_and_repair_1(_,[],_, false, Errors) ->
case lists:all(fun(not_found) ->
true;
(_) ->
false
end, Errors) of
true ->
{error, not_found};
_ ->
%% If some nodes are missing, reply the state is uncertain
{error, ?ERROR_RECOVER_FAILURE}
end;
read_and_repair_1(_,[],_,_,_) ->
{error, ?ERROR_RECOVER_FAILURE};
read_and_repair_1(ReadParams, [Node|Rest], AvailableNodes, HasUnavailableNodes, Errors) ->
case read_and_repair_2(ReadParams, Node, AvailableNodes) of
{error, Cause} ->
read_and_repair_1(ReadParams, Rest, AvailableNodes, [Cause|Errors]);
read_and_repair_1(ReadParams, Rest, AvailableNodes, HasUnavailableNodes, [Cause|Errors]);
Ret ->
Ret
end.
Expand Down
88 changes: 88 additions & 0 deletions apps/leo_storage/test/leo_storage_handler_object_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ object_handler_test_() ->
{foreach, fun setup/0, fun teardown/1,
[{with, [T]} || T <- [fun get_a0_/1,
fun get_a1_/1,
fun get_nodedown/1,
fun get_unavailablenode/1,
%% fun get_b0_/1,
%% fun get_b1_/1,
%% fun get_b2_/1,
Expand Down Expand Up @@ -170,6 +172,92 @@ get_a1_({Node0, Node1}) ->
?assertEqual({ok, Ref, ?TEST_META_0, <<>>}, Res),
ok.

%% @doc Get Test when One Node is up but another is down, up node does not hold the object
%% @@ private
get_nodedown({Node0, Node1}) ->
%% leo_redundant_manager_api
meck:new(leo_redundant_manager_api, [non_strict]),
meck:expect(leo_redundant_manager_api, get_redundancies_by_addr_id,
fun(get, _AddrId) ->
{ok, #redundancies{id = 0,
nodes = [#redundant_node{node = Node0,
available = true},
#redundant_node{node = Node1,
available = true}],
n = 2, r = 1, w = 1, d = 1}}
end),

%% leo_object_storage_api
meck:new(leo_object_storage_api, [non_strict]),
meck:expect(leo_object_storage_api, get,
fun(_Key, _StartPos, _EndPos) ->
{error, not_found}
end),
meck:expect(leo_object_storage_api, get,
fun(_Key, _StartPos, _EndPos, _IsForcedCheck) ->
{error, not_found}
end),

meck:new(rpc, [unstick, passthrough]),
meck:expect(rpc, nb_yield,
fun(_, _) ->
{value, {badrpc, nodedown}}
end),

meck:new(leo_metrics_req, [non_strict]),
meck:expect(leo_metrics_req, notify, fun(_) -> ok end),

meck:new(leo_watchdog_state, [non_strict]),
meck:expect(leo_watchdog_state, find_not_safe_items, fun(_) -> not_found end),

meck:new(leo_storage_watchdog_error, [non_strict]),
meck:expect(leo_storage_watchdog_error, push, fun(_) -> ok end),

AddrId = 0,
ReqId = 0,
Res = leo_storage_handler_object:get(AddrId, ?TEST_KEY_0, ReqId),
?assertNotEqual({error, not_found}, Res),
ok.

get_unavailablenode({Node0, Node1}) ->
%% leo_redundant_manager_api
meck:new(leo_redundant_manager_api, [non_strict]),
meck:expect(leo_redundant_manager_api, get_redundancies_by_addr_id,
fun(get, _AddrId) ->
{ok, #redundancies{id = 0,
nodes = [#redundant_node{node = Node0,
available = true},
#redundant_node{node = Node1,
available = false}],
n = 2, r = 1, w = 1, d = 1}}
end),

%% leo_object_storage_api
meck:new(leo_object_storage_api, [non_strict]),
meck:expect(leo_object_storage_api, get,
fun(_Key, _StartPos, _EndPos) ->
{error, not_found}
end),
meck:expect(leo_object_storage_api, get,
fun(_Key, _StartPos, _EndPos, _IsForcedCheck) ->
{error, not_found}
end),

meck:new(leo_metrics_req, [non_strict]),
meck:expect(leo_metrics_req, notify, fun(_) -> ok end),

meck:new(leo_watchdog_state, [non_strict]),
meck:expect(leo_watchdog_state, find_not_safe_items, fun(_) -> not_found end),

meck:new(leo_storage_watchdog_error, [non_strict]),
meck:expect(leo_storage_watchdog_error, push, fun(_) -> ok end),

AddrId = 0,
ReqId = 0,
Res = leo_storage_handler_object:get(AddrId, ?TEST_KEY_0, ReqId),
?assertNotEqual({error, not_found}, Res),
ok.

%%--------------------------------------------------------------------
%% PUT
%%--------------------------------------------------------------------
Expand Down

0 comments on commit a24b423

Please sign in to comment.