Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read repair - configure to repair primary only #1844

Merged
merged 10 commits into from
Jan 19, 2023
32 changes: 30 additions & 2 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,35 @@
%% converted to an overflow queue as part of this release. Instead use
%% `replrtq_overflow_limit` to control the queue size, including on-disk size.
{mapping, "replrtq_srcqueuelimit", "riak_kv.replrtq_srcqueuelimit", [
{datatype, integer},
{default, 300000},
{datatype, integer},
{default, 300000},
hidden
]}.

%% @doc Choose to read repair to primary vnodes only
%% When fallback vnodes are elected, then read repair will by default repair
%% any missing data from the vnode - i.e. every GET while the fallback is in
%% play will lead to a PUT to add the rewuested object to the fallback vnode,
%% as the fallback by default starts empty.
%% If the expectation is that failed vnodes are replaced quickly, as would be
%% possible in a Cloud scenario, this may not be desirable. Read repair to
%% fallbacks reduce throughput in failure scenarios, and then the hinted
%% handoffs following recovery are impaired by the historic data which is
%% already in the recovered node, and has to be handed off as well as the
%% fresh updates received since the failure.
%% When fallback vnodes are expected to be in place for a long period, the
%% default setting of read repairing fallbacks may be preferred, as it will
%% provide additional data resilience, and potentially improved performance
%% where the same objects are repeatedly fetched.
{mapping, "read_repair_primaryonly", "riak_kv.read_repair_primaryonly", [
{datatype, {flag, enabled, disabled}},
{default, disabled}
]}.

%% @doc If reads discovers keys to be repaired, should each key
%% that is repaired be logged
{mapping, "read_repair_log", "riak_kv.read_repair_log", [
{datatype, {flag, enabled, disabled}},
{default, disabled},
hidden
]}.
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
]}.

{deps, [
{riak_core, {git, "https://github.com/basho/riak_core.git", {tag, "riak_kv-3.0.12"}}},
{riak_core, {git, "https://github.com/basho/riak_core.git", {branch, "mas-i994-handoffsync"}}},
{sidejob, {git, "https://github.com/basho/sidejob.git", {tag, "2.1.0"}}},
{bitcask, {git, "https://github.com/basho/bitcask.git", {tag, "2.1.0"}}},
{redbug, {git, "https://github.com/massemanet/redbug", {tag, "1.2.2"}}},
Expand All @@ -52,6 +52,7 @@
{riak_dt, {git, "https://github.com/basho/riak_dt.git", {tag, "riak_kv-3.0.0"}}},
{riak_api, {git, "https://github.com/basho/riak_api.git", {tag, "riak_kv-3.0.12"}}},
{hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}},
{leveled, {git, "https://github.com/martinsumner/leveled.git", {branch, "mas-i389-rebuildledger"}}},
{kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.4"}}},
{riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}}
]}.
54 changes: 38 additions & 16 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
-export([ensemble/1]).
-export([fetch/2, push/4]).
-export([membership_request/1, replrtq_reset_all_peers/1, replrtq_reset_all_workercounts/2]).
-export([tictacaae_suspend_node/0, tictacaae_resume_node/0]).
-export([remove_node_from_coverage/0, reset_node_for_coverage/0]).
-export([repair_node/0]).

-compile({no_auto_import,[put/2]}).
%% @type default_timeout() = 60000
Expand Down Expand Up @@ -901,13 +903,14 @@ aae_fold(Query) ->
aae_fold(Query, {?MODULE, [Node, _ClientId]}) ->
Me = self(),
ReqId = mk_reqid(),
TimeOut = ?DEFAULT_FOLD_TIMEOUT,
TimeOut =
app_helper:get_env(
riak_kv, riak_client_aaefold_timeout, ?DEFAULT_FOLD_TIMEOUT),
Q0 = riak_kv_clusteraae_fsm:convert_fold(Query),
case riak_kv_clusteraae_fsm:is_valid_fold(Q0) of
true ->
riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(Node,
[{raw, ReqId, Me},
[Q0, TimeOut]]),
riak_kv_clusteraae_fsm_sup:start_clusteraae_fsm(
Node, [{raw, ReqId, Me}, [Q0, TimeOut]]),
wait_for_fold_results(ReqId, TimeOut);
false ->
{error, "Invalid AAE fold definition"}
Expand All @@ -929,9 +932,8 @@ ttaaefs_fullsync(WorkItem) ->
-spec ttaaefs_fullsync(riak_kv_ttaaefs_manager:work_item(), integer()) -> ok.
ttaaefs_fullsync(WorkItem, SecsTimeout) ->
ReqId = mk_reqid(),
riak_kv_ttaaefs_manager:process_workitem(WorkItem,
ReqId,
os:timestamp()),
riak_kv_ttaaefs_manager:process_workitem(
WorkItem, ReqId, os:timestamp()),
wait_for_reqid(ReqId, SecsTimeout * 1000).

%% @doc
Expand All @@ -941,22 +943,42 @@ ttaaefs_fullsync(WorkItem, SecsTimeout) ->
erlang:timestamp()) -> ok.
ttaaefs_fullsync(WorkItem, SecsTimeout, Now) ->
ReqId = mk_reqid(),
riak_kv_ttaaefs_manager:process_workitem(WorkItem,
ReqId,
Now),
riak_kv_ttaaefs_manager:process_workitem(WorkItem, ReqId, Now),
wait_for_reqid(ReqId, SecsTimeout * 1000).

-spec repair_node() -> ok.
repair_node() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
NodeToRepair = node(),
PartitionsToRepair =
lists:filtermap(
fun({P, Node}) ->
case Node of
NodeToRepair ->
{true, P};
_ ->
false
end
end,
riak_core_ring:all_owners(Ring)),
[riak_kv_vnode:repair(P) || P <- PartitionsToRepair],
ok.

-spec tictacaae_suspend_node() -> ok.
tictacaae_suspend_node() ->
application:set_env(riak_kv, tictacaae_suspend, true).

-spec tictacaae_resume_node() -> ok.
tictacaae_resume_node() ->
application:set_env(riak_kv, tictacaae_suspend, false).

-spec participate_in_coverage(boolean()) -> ok.
participate_in_coverage(Participate) ->
F =
fun(Ring, _) ->
fun(R, _) ->
{new_ring,
riak_core_ring:update_member_meta(node(),
Ring,
node(),
participate_in_coverage,
Participate)}
riak_core_ring:update_member_meta(
node(), R, node(), participate_in_coverage, Participate)}
end,
{ok, _FinalRing} = riak_core_ring_manager:ring_trans(F, undefined),
ok.
Expand Down
13 changes: 9 additions & 4 deletions src/riak_kv_get_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@
{error, any()} |
{fetch, list()}.
-type repair_reason() :: notfound | outofdate.
-type final_action() :: nop |
{read_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} |
{delete_repair, [{non_neg_integer() | repair_reason()}], riak_object:riak_object()} |
delete.
-type final_action() ::
nop |
{read_repair,
[{non_neg_integer(), repair_reason()}],
riak_object:riak_object()} |
{delete_repair,
[{non_neg_integer(), repair_reason()}],
riak_object:riak_object()} |
delete.
-type idxresult() :: {non_neg_integer(), result()}.
-type idx_type() :: [{non_neg_integer, 'primary' | 'fallback'}].

Expand Down
50 changes: 45 additions & 5 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -689,24 +689,64 @@ roll_d100() ->
-endif.

%% Issue read repairs for any vnodes that are out of date
read_repair(Indices, RepairObj,
read_repair(GetCoreIndices, RepairObj,
#state{req_id = ReqId, starttime = StartTime,
preflist2 = Sent, bkey = BKey, crdt_op = CrdtOp,
bucket_props = BucketProps, trace = Trace}) ->
RepairPreflist = [{Idx, Node} || {{Idx, Node}, _Type} <- Sent,
get_option(Idx, Indices) /= undefined],
RepairPreflist =
lists:filtermap(
fun({{Idx, Node}, Type}) ->
read_repair_index({{Idx, Node}, Type}, GetCoreIndices)
end,
Sent),
DocIdxList =
lists:map(
fun({{Idx, Node}, _Type, Reason}) ->
case app_helper:get_env(riak_kv, read_repair_log, false) of
true ->
lager:info(
"Read repair of ~p on ~w ~w for reason ~w",
[BKey, Idx, Node, Reason]);
false ->
ok
end,
{Idx, Node}
end,
RepairPreflist),
case Trace of
true ->
Ps = preflist_for_tracing(RepairPreflist),
?DTRACE(?C_GET_FSM_RR, [], Ps);
_ ->
ok
end,
riak_kv_vnode:readrepair(RepairPreflist, BKey, RepairObj, ReqId,
riak_kv_vnode:readrepair(DocIdxList, BKey, RepairObj, ReqId,
StartTime, [{returnbody, false},
{bucket_props, BucketProps},
{crdt_op, CrdtOp}]),
ok = riak_kv_stat:update({read_repairs, Indices, Sent}).
ok = riak_kv_stat:update({read_repairs, RepairPreflist}).

-spec read_repair_index(
{{non_neg_integer(), node()}, primary|fallback},
list({non_neg_integer(), outofdate|notfound})) ->
boolean()|
{true,
{{non_neg_integer(), node()},
primary|fallback,
outofdate|notfound}}.
read_repair_index({{Idx, Node}, Type}, Indices) ->
case get_option(Idx, Indices) of
undefined ->
false;
Reason ->
RRP = app_helper:get_env(riak_kv, read_repair_primaryonly, false),
case {RRP, Type} of
{true, fallback} ->
false;
_ ->
{true, {{Idx, Node}, Type, Reason}}
end
end.

get_option(Name, Options) ->
get_option(Name, Options, undefined).
Expand Down
27 changes: 13 additions & 14 deletions src/riak_kv_stat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ do_update({index_fsm_time, Microsecs, ResultCount}) ->
ok = exometer:update([P, ?APP, index, fsm, complete], 1),
ok = exometer:update([P, ?APP, index, fsm, results], ResultCount),
ok = exometer:update([P, ?APP, index, fsm, time], Microsecs);
do_update({read_repairs, Indices, Preflist}) ->
do_update({read_repairs, Preflist}) ->
ok = exometer:update([?PFX, ?APP, node, gets, read_repairs], 1),
do_repairs(Indices, Preflist);
do_repairs(Preflist);
do_update({tictac_aae, ExchangeState}) ->
ok = exometer:update([?PFX, ?APP, node, tictacaae, ExchangeState], 1);
do_update({tictac_aae, ExchangeType, RepairCount}) ->
Expand Down Expand Up @@ -507,19 +507,18 @@ do_stages(Path, [{Stage, Time}|Stages]) ->
do_stages(Path, Stages).

%% create dimensioned stats for read repairs.
%% The indexes are from get core [{Index, Reason::notfound|outofdate}]
%% preflist is a preflist of [{{Index, Node}, Type::primary|fallback}]
do_repairs(Indices, Preflist) ->
%% The preflist has been filtered to remove those that will not be subject to
%% repair
do_repairs(Preflist) ->
Pfx = riak_core_stat:prefix(),
lists:foreach(fun({{Idx, Node}, Type}) ->
case proplists:get_value(Idx, Indices) of
undefined ->
ok;
Reason ->
create_or_update([Pfx, ?APP, node, gets, read_repairs, Node, Type, Reason], 1, spiral)
end
end,
Preflist).
lists:foreach(
fun({{_Idx, Node}, Type, Reason}) ->
create_or_update(
[Pfx, ?APP, node, gets, read_repairs, Node, Type, Reason],
1,
spiral)
end,
Preflist).

%% for dynamically created / dimensioned stats
%% that can't be registered at start up
Expand Down
Loading