From d6acec0ccb95a2fb6bcd0e2bee376fb7303cd35b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 8 Jun 2023 13:33:55 +0100 Subject: [PATCH] Handle reap and erase in batches Avoid overloading the eraser/reaper process mailbox by sending the requests in batches (as already happened with range_repl), and waiting for a response. When a job is used, not local, the batching is done from the clusteraae_fsm. This mechanism existed prior to this commit, and has not been changed, but has been extended to support the last-batch overflow --- src/riak_kv_clusteraae_fsm.erl | 26 +++++++++---- src/riak_kv_eraser.erl | 10 +++++ src/riak_kv_queue_manager.erl | 22 +++++++++-- src/riak_kv_reaper.erl | 10 +++++ src/riak_kv_vnode.erl | 71 ++++++++++++++++++++-------------- 5 files changed, 100 insertions(+), 39 deletions(-) diff --git a/src/riak_kv_clusteraae_fsm.erl b/src/riak_kv_clusteraae_fsm.erl index d1f45d8ec..d27dcaed1 100644 --- a/src/riak_kv_clusteraae_fsm.erl +++ b/src/riak_kv_clusteraae_fsm.erl @@ -484,14 +484,25 @@ process_results(Results, State) -> {siblings, merge_countinlists(A_SbL, R_SbL)}]; QT when QT == erase_keys; QT == reap_tombs -> case Results of - {[], Count, local} -> - {[], element(2, Acc) + Count, local}; + {BKDHL, Count, local} -> + UpdCount = element(2, Acc) + Count, + Mod = + case QT of + erase_keys -> + riak_kv_eraser; + reap_tombs -> + riak_kv_reaper + end, + handle_in_batches( + QT, lists:reverse(BKDHL), 0, Mod), + {[], UpdCount, local}; {[], Count, count} -> {[], element(2, Acc) + Count, count}; - {BKDHL, 0, Pid} -> + {BKDHL, Count, Pid} -> {[], AccCount, Pid} = Acc, - UpdCount = length(BKDHL) + AccCount, - handle_in_batches(QT, lists:reverse(BKDHL), 0, Pid), + UpdCount = Count + AccCount, + handle_in_batches( + QT, lists:reverse(BKDHL), 0, Pid), {[], UpdCount, Pid} end end @@ -764,12 +775,11 @@ hash_function({rehash, InitialisationVector}) -> -spec handle_in_batches(reap_tombs|erase_keys, list(riak_kv_reaper:reap_reference())| list(riak_kv_eraser:delete_reference()), - non_neg_integer(), pid()) -> ok. + non_neg_integer(), pid()|module()) -> ok. handle_in_batches(_Type, [], _BatchCount, _Worker) -> ok; handle_in_batches(Type, RefList, BatchCount, Worker) when BatchCount >= ?DELETE_BATCH_SIZE -> - case Type of reap_tombs -> _ = riak_kv_reaper:reap_stats(Worker); @@ -786,6 +796,8 @@ handle_in_batches(Type, [Ref|RestRefs], BatchCount, Worker) -> end, handle_in_batches(Type, RestRefs, BatchCount + 1, Worker). + + %% =================================================================== %% Internal functions %% =================================================================== diff --git a/src/riak_kv_eraser.erl b/src/riak_kv_eraser.erl index 090fd00b4..4eadeb7e7 100644 --- a/src/riak_kv_eraser.erl +++ b/src/riak_kv_eraser.erl @@ -38,6 +38,8 @@ start_job/1, request_delete/1, request_delete/2, + bulk_request_delete/1, + bulk_request_delete/2, delete_stats/0, delete_stats/1, override_redo/1, @@ -84,6 +86,14 @@ request_delete(DeleteReference) -> request_delete(Pid, DeleteReference) -> riak_kv_queue_manager:request(Pid, DeleteReference). +-spec bulk_request_delete(list(delete_reference())) -> ok. +bulk_request_delete(RefList) -> + bulk_request_delete(?MODULE, RefList). + +-spec bulk_request_delete(pid()|module(), list(delete_reference())) -> ok. +bulk_request_delete(Pid, RefList) -> + riak_kv_queue_manager:bulk_request(Pid, RefList). + -spec delete_stats() -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). delete_stats() -> delete_stats(?MODULE). diff --git a/src/riak_kv_queue_manager.erl b/src/riak_kv_queue_manager.erl index bb436183c..dae1245d3 100644 --- a/src/riak_kv_queue_manager.erl +++ b/src/riak_kv_queue_manager.erl @@ -39,6 +39,7 @@ -export([start_link/2, start_job/3, request/2, + bulk_request/2, stats/1, immediate_action/2, override_redo/2, @@ -121,6 +122,10 @@ start_job(JobID, Module, RootPath) -> request(Pid, Reference) -> gen_server:cast(Pid, {request, Reference, ?REQUEST_PRIORITY}). +-spec bulk_request(pid()|module(), list()) -> ok. +bulk_request(Pid, RefList) -> + gen_server:call(Pid, {bulk_request, RefList, ?REQUEST_PRIORITY}, infinity). + -spec stats(pid()|module()) -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). stats(Pid) -> @@ -180,14 +185,23 @@ handle_call(stop_job, _From, State) -> {reply, ok, State#state{pending_close = true}, 0}; handle_call({immediate_action, Reference}, _From, State) -> Mod = State#state.callback_mod, - {reply, Mod:action(Reference, false), State, 0}. + {reply, Mod:action(Reference, false), State, 0}; +handle_call({bulk_request, RefList, Priority}, From, State) -> + gen_server:reply(From, ok), + UpdOverflowQueue = + lists:foldr( + fun(Ref, AccQ) -> + riak_kv_overflow_queue:addto_queue(Priority, Ref, AccQ) + end, + State#state.queue, + RefList), + {noreply, State#state{queue = UpdOverflowQueue}, 0}. handle_cast({request, Reference, Priority}, State) -> UpdOverflowQueue = - riak_kv_overflow_queue:addto_queue(Priority, - Reference, - State#state.queue), + riak_kv_overflow_queue:addto_queue( + Priority, Reference, State#state.queue), {noreply, State#state{queue = UpdOverflowQueue}, 0}. diff --git a/src/riak_kv_reaper.erl b/src/riak_kv_reaper.erl index 305694d44..ef04ce582 100644 --- a/src/riak_kv_reaper.erl +++ b/src/riak_kv_reaper.erl @@ -49,6 +49,8 @@ start_job/1, request_reap/1, request_reap/2, + bulk_request_reap/1, + bulk_request_reap/2, direct_reap/1, reap_stats/0, reap_stats/1, @@ -94,6 +96,14 @@ request_reap(ReapReference) -> request_reap(Pid, ReapReference) -> riak_kv_queue_manager:request(Pid, ReapReference). +-spec bulk_request_reap(list(reap_reference())) -> ok. +bulk_request_reap(RefList) -> + bulk_request_reap(?MODULE, RefList). + +-spec bulk_request_reap(pid()|module(), list(reap_reference())) -> ok. +bulk_request_reap(Pid, RefList) -> + riak_kv_queue_manager:bulk_request(Pid, RefList). + -spec reap_stats() -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). reap_stats() -> reap_stats(?MODULE). diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 37f3f885d..b240b3f5c 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -233,6 +233,10 @@ %% Best efforts (aka scavenger) pool. %% Parallel AAE store rebuilds + +-define(REAPER_BATCH_SIZE, 1024). +-define(ERASER_BATCH_SIZE, 1024). + %% Erlang's if Bool -> thing; true -> thang end. syntax hurts my %% brain. It scans as if true -> thing; true -> thang end. So, here is %% a macro, ?ELSE to use in if statements. You're welcome. @@ -1983,7 +1987,7 @@ handle_aaefold({find_tombs, handle_aaefold({reap_tombs, Bucket, KeyRange, SegmentFilter, ModifiedRange, - ReapMethod}, + _ReapMethod}, InitAcc, _Nval, IndexNs, Filtered, ReturnFun, Cntrl, Sender, State) -> @@ -1994,18 +1998,24 @@ handle_aaefold({reap_tombs, {true, undefined} -> {clock, VV} = lists:keyfind(clock, 1, EFs), DH = riak_object:delete_hash(VV), - case ReapMethod of - local -> - riak_kv_reaper:request_reap({{BF, KF}, DH}), - NewCount = element(2, TombHashAcc) + 1, - setelement(2, TombHashAcc, NewCount); - count -> - NewCount = element(2, TombHashAcc) + 1, - setelement(2, TombHashAcc, NewCount); - {job, _JobID} -> - {[{{BF, KF}, DH}|element(1, TombHashAcc)], - element(2, TombHashAcc), - element(3, TombHashAcc)} + case TombHashAcc of + {BatchList, Count, local} -> + NewCount = Count + 1, + case NewCount div ?REAPER_BATCH_SIZE of + 0 -> + riak_kv_reaper:bulk_request_reap( + [{{BF, KF}, DH}|BatchList] + ), + {[], NewCount, local}; + _ -> + {[{{BF, KF}, DH}|BatchList], + NewCount, + local} + end; + {BatchList, Count, count} -> + {BatchList, Count + 1, count}; + {BatchList, Count, Job} -> + {[{{BF, KF}, DH}|BatchList], Count + 1, Job} end; {false, undefined} -> TombHashAcc @@ -2027,7 +2037,7 @@ handle_aaefold({reap_tombs, handle_aaefold({erase_keys, Bucket, KeyRange, SegmentFilter, ModifiedRange, - DeleteMethod}, + _DeleteMethod}, InitAcc, _Nval, IndexNs, Filtered, ReturnFun, Cntrl, Sender, State) -> @@ -2044,18 +2054,23 @@ handle_aaefold({erase_keys, EraseKeyAcc; {false, undefined} -> {clock, VV} = lists:keyfind(clock, 1, EFs), - case DeleteMethod of - local -> - riak_kv_eraser:request_delete({{BF, KF}, VV}), - NewCount = element(2, EraseKeyAcc) + 1, - setelement(2, EraseKeyAcc, NewCount); - count -> - NewCount = element(2, EraseKeyAcc) + 1, - setelement(2, EraseKeyAcc, NewCount); - {job, _JobID} -> - {[{{BF, KF}, VV}|element(1, EraseKeyAcc)], - element(2, EraseKeyAcc), - element(3, EraseKeyAcc)} + case EraseKeyAcc of + {BatchList, Count, local} -> + NewCount = Count + 1, + case NewCount div ?ERASER_BATCH_SIZE of + 0 -> + riak_kv_eraser:bulk_request_delete( + [{{BF, KF}, VV}|BatchList]), + {[], NewCount, local}; + _ -> + {[{{BF, KF}, VV}|BatchList], + NewCount, + local} + end; + {BatchList, Count, count} -> + {BatchList, Count + 1, count}; + {BatchList, Count, Job} -> + {[{{BF, KF}, VV}|BatchList], Count + 1, Job} end end end, @@ -2754,7 +2769,7 @@ final_delete(BKey, DeleteHash, State = #state{mod=Mod, modstate=ModState}) -> [BKey, IsDeleted, DeleteHash, OtherHash]), State#state{modstate=ModState1} end; - {{error, _}, ModState1} -> + {{error, _R}, ModState1} -> State#state{modstate=ModState1} end. @@ -3489,7 +3504,7 @@ do_delete(BKey, State) -> ModState = State#state.modstate, Idx = State#state.idx, DeleteMode = State#state.delete_mode, - + %% Get the existing object. case do_get_term(BKey, Mod, ModState) of {{ok, RObj}, UpdModState} ->