From de7d2bc887ca5a2ec68284f3e7dd78c043b9b6b4 Mon Sep 17 00:00:00 2001 From: Fred Dushin Date: Thu, 22 Dec 2016 11:32:24 -0500 Subject: [PATCH] Merge in cherry-pick from fd-remove-yz-dep Passes xref and eunit test. Chery pick of https://github.com/basho/riak_kv/commit/32bcff513d92e228facd471cc2313a2623b1269d --- src/riak_kv_update_hook.erl | 72 +++++++++++++++++++++ src/riak_kv_vnode.erl | 125 +++++++++++++++++++++++++----------- 2 files changed, 161 insertions(+), 36 deletions(-) create mode 100644 src/riak_kv_update_hook.erl diff --git a/src/riak_kv_update_hook.erl b/src/riak_kv_update_hook.erl new file mode 100644 index 0000000000..838bbca2dc --- /dev/null +++ b/src/riak_kv_update_hook.erl @@ -0,0 +1,72 @@ +%% +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2016 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% +-module(riak_kv_update_hook). + +-include_lib("riak_core/include/riak_core_vnode.hrl"). + +-export_type([object_pair/0, update_reason/0, repair/0, partition/0, handoff_dest/0]). + +-type object_pair() :: {riak_object:riak_object(), riak_object:riak_object() | no_old_object}. +-type repair() :: full_repair | tree_repair | failed_repair. +-type update_reason() :: + delete + | handoff + | put + | anti_entropy + | {delete, repair()} + | {anti_entropy, repair()} + | {anti_entropy_delete, repair()} + | anti_entropy_delete. + + +%% @doc Update a Riak object, given a reason and partition under which +%% the object is being indexed. The object pair contains the new +%% and old objects, in the case where a read-before-write resulted +%% in an old object. +-callback update( + object_pair(), + update_reason(), + partition() +) -> + ok. + +%% @doc Update a Riak object encoded as an erlang binary. This function +%% is typically called from the write-once path, where there is no +%% old object to pass. +-callback update_binary( + riak_core_bucket:bucket(), + riak_object:key(), + binary(), + update_reason(), + partition() +) -> + ok. + +%% @doc Determine whether a bucket requires an existing object, +%% based on its properties. If this function returns true, +%% this may result in a read-before-write in the vnode. +-callback requires_existing_object(riak_kv_bucket:props()) -> + boolean(). + +%% @doc Determine whether handoff should start. +-callback should_handoff(handoff_dest()) -> + boolean(). diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index ac4cc1a562..3d324b4ebc 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -96,25 +96,6 @@ -export([put_merge/6]). %% For fsm_eqc_vnode -endif. -%% N.B. The ?INDEX macro should be called any time the object bytes on -%% disk are modified. --ifdef(TEST). -%% Use values so that test compile doesn't give 'unused vars' warning. --define(INDEX(A,B,C), _=element(1,{{_A1, _A2} = A,B,C}), ok). --define(INDEX_BIN(A,B,C,D,E), _=element(1,{A,B,C,D,E}), ok). --define(IS_SEARCH_ENABLED_FOR_BUCKET(BProps), _=element(1, {BProps}), false). --else. --define(INDEX(Objects, Reason, Partition), yz_kv:index(Objects, Reason, Partition)). --define(INDEX_BIN(Bucket, Key, Obj, Reason, Partition), yz_kv:index_binary(Bucket, Key, Obj, Reason, Partition)). --define(IS_SEARCH_ENABLED_FOR_BUCKET(BProps), yz_kv:is_search_enabled_for_bucket(BProps)). --endif. - --ifdef(TEST). --define(YZ_SHOULD_HANDOFF(X), true). --else. --define(YZ_SHOULD_HANDOFF(X), yz_kv:should_handoff(X)). --endif. - -record(mrjob, {cachekey :: term(), bkey :: term(), reqid :: term(), @@ -138,6 +119,8 @@ leasing = false :: boolean() }). +-type update_hook() :: module() | undefined. + -record(state, {idx :: partition(), mod :: module(), async_put :: boolean(), @@ -168,7 +151,8 @@ tictac_startqueue = os:timestamp() :: erlang:timestamp(), tictac_rebuilding = false :: erlang:timestamp()|false, worker_pool_strategy = single :: none|single|dscp, - vnode_pool_pid :: undefined|pid() + vnode_pool_pid :: undefined|pid(), + update_hook :: update_hook() }). -type index_op() :: add | remove. @@ -784,7 +768,8 @@ init([Index]) -> mrjobs=dict:new(), md_cache=MDCache, md_cache_size=MDCacheSize, - worker_pool_strategy=WorkerPoolStrategy}, + worker_pool_strategy=WorkerPoolStrategy, + update_hook=update_hook()}, try_set_vnode_lock_limit(Index), case AsyncFolding of true -> @@ -1380,7 +1365,7 @@ handle_request(kv_w1c_put_request, Req, Sender, State=#state{async_put=true}) -> {error, Reason, UpModState} -> {reply, ?KV_W1C_PUT_REPLY{reply={error, Reason}, type=ReplicaType}, State#state{modstate=UpModState}} end; -handle_request(kv_w1c_put_request, Req, _Sender, State=#state{async_put=false}) -> +handle_request(kv_w1c_put_request, Req, _Sender, State=#state{async_put=false, update_hook=UpdateHook}) -> {Bucket, Key} = riak_kv_requests:get_bucket_key(Req), EncodedVal = riak_kv_requests:get_encoded_obj(Req), ReplicaType = riak_kv_requests:get_replica_type(Req), @@ -1393,7 +1378,7 @@ handle_request(kv_w1c_put_request, Req, _Sender, State=#state{async_put=false}) aae_update(Bucket, Key, use_binary, assumed_no_old_object, EncodedVal, State), % Write once path - and so should be a new object. If not this % is an application fault - ?INDEX_BIN(Bucket, Key, EncodedVal, put, Idx), + maybe_update_binary(UpdateHook, Bucket, Key, EncodedVal, put, Idx), update_vnode_stats(vnode_put, Idx, StartTS), {reply, ?KV_W1C_PUT_REPLY{reply=ok, type=ReplicaType}, State#state{modstate=UpModState}}; {error, Reason, UpModState} -> @@ -1966,9 +1951,9 @@ do_request_hash(_, _) -> -handoff_starting({_HOType, TargetNode}=_X, State=#state{handoffs_rejected=RejectCount}) -> +handoff_starting({_HOType, TargetNode}=HandoffDest, State=#state{handoffs_rejected=RejectCount, update_hook=UpdateHook}) -> MaxRejects = app_helper:get_env(riak_kv, handoff_rejected_max, 6), - case MaxRejects =< RejectCount orelse ?YZ_SHOULD_HANDOFF(_X) of + case MaxRejects =< RejectCount orelse maybe_should_handoff(UpdateHook, HandoffDest) of true -> {true, State#state{in_handoff=true, handoff_target=TargetNode}}; false -> @@ -2083,11 +2068,11 @@ terminate(_Reason, #state{idx=Idx, ok. handle_info({{w1c_async_put, From, Type, Bucket, Key, EncodedVal, StartTS} = _Context, Reply}, - State=#state{idx=Idx}) -> + State=#state{idx=Idx, update_hook=UpdateHook}) -> aae_update(Bucket, Key, use_binary, assumed_no_old_object, EncodedVal, State), % Write once path - and so should be a new object. If not this % is an application fault - ?INDEX_BIN(Bucket, Key, EncodedVal, put, Idx), + maybe_update_binary(UpdateHook, Bucket, Key, EncodedVal, put, Idx), riak_core_vnode:reply(From, ?KV_W1C_PUT_REPLY{reply=Reply, type=Type}), update_vnode_stats(vnode_put, Idx, StartTS), {ok, State}; @@ -2311,9 +2296,14 @@ do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, StartTime, Options, State) -> update_index_write_stats(UpdPutArgs#putargs.is_index, UpdPutArgs#putargs.index_specs), {Reply, UpdState}. +-spec do_backend_delete( + {riak_core_bucket:bucket(), riak_object:key()}, + riak_object:riak_object(), #state{} +) -> #state{}. do_backend_delete(BKey, RObj, State = #state{idx = Idx, mod = Mod, - modstate = ModState}) -> + modstate = ModState, + update_hook = UpdateHook}) -> %% object is a tombstone or all siblings are tombstones %% Calculate the index specs to remove... %% JDM: This should just be a tombstone by this point, but better @@ -2324,7 +2314,7 @@ do_backend_delete(BKey, RObj, State = #state{idx = Idx, {Bucket, Key} = BKey, case Mod:delete(Bucket, Key, IndexSpecs, ModState) of {ok, UpdModState} -> - ?INDEX({RObj, no_old_object}, delete, Idx), + maybe_update(UpdateHook, {RObj, no_old_object}, delete, Idx), aae_delete(Bucket, Key, RObj, State), maybe_cache_evict(BKey, State), update_index_delete_stats(IndexSpecs), @@ -2350,7 +2340,8 @@ delete_hash(RObj) -> putargs(), state()}. prepare_put(State=#state{vnodeid=VId, mod=Mod, - modstate=ModState}, + modstate=ModState, + update_hook=UpdateHook}, PutArgs=#putargs{bkey={Bucket, _Key}, lww=LWW, coord=Coord, @@ -2362,7 +2353,7 @@ prepare_put(State=#state{vnodeid=VId, %% no need to incur additional get. Otherwise, we need to read the %% old object to know how the indexes have changed. IndexBackend = is_indexed_backend(Mod, Bucket, ModState), - IsSearchable = ?IS_SEARCH_ENABLED_FOR_BUCKET(BProps), + IsSearchable = maybe_requires_existing_object(UpdateHook, BProps), SkipReadBeforeWrite = LWW andalso (not IndexBackend) andalso (not IsSearchable), case SkipReadBeforeWrite of true -> @@ -2595,13 +2586,14 @@ actual_put(BKey, {Obj, OldObj}, IndexSpecs, RB, ReqID, State) -> actual_put(BKey={Bucket, Key}, {Obj, OldObj}, IndexSpecs, RB, ReqID, MaxCheckFlag, State=#state{idx=Idx, mod=Mod, - modstate=ModState}) -> + modstate=ModState, + update_hook=UpdateHook}) -> case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState, MaxCheckFlag) of {{ok, UpdModState}, EncodedVal} -> aae_update(Bucket, Key, Obj, OldObj, EncodedVal, State), maybe_cache_object(BKey, Obj, State), - ?INDEX({Obj, maybe_old_object(OldObj)}, put, Idx), + maybe_update(UpdateHook, {Obj, maybe_old_object(OldObj)}, put, Idx), Reply = case RB of true -> {dw, Idx, Obj, ReqID}; @@ -3144,7 +3136,8 @@ do_get_vclock({Bucket, Key}, Mod, ModState) -> do_diffobj_put({Bucket, Key}=BKey, DiffObj, StateData=#state{mod=Mod, modstate=ModState, - idx=Idx}) -> + idx=Idx, + update_hook=UpdateHook}) -> StartTS = os:timestamp(), {ok, Capabilities} = Mod:capabilities(Bucket, ModState), IndexBackend = lists:member(indexes, Capabilities), @@ -3166,7 +3159,7 @@ do_diffobj_put({Bucket, Key}=BKey, DiffObj, StateData), update_index_write_stats(IndexBackend, IndexSpecs), update_vnode_stats(vnode_put, Idx, StartTS), - ?INDEX({DiffObj2, no_old_object}, handoff, Idx), + maybe_update(UpdateHook, {DiffObj, no_old_object}, handoff, Idx), {ok, State2#state{modstate=UpdModState}}; {{error, Reason, UpdModState}, _Val} -> {error, Reason, State2#state{modstate=UpdModState}} @@ -3194,7 +3187,9 @@ do_diffobj_put({Bucket, Key}=BKey, DiffObj, StateData), update_index_write_stats(IndexBackend, IndexSpecs), update_vnode_stats(vnode_put, Idx, StartTS), - ?INDEX({AMObj, maybe_old_object(OldObj)}, handoff, Idx), + maybe_update(UpdateHook, + {AMObj, maybe_old_object(OldObj)}, + handoff, Idx), {ok, State2#state{modstate=UpdModState}}; {{error, Reason, UpdModState}, _Val} -> {error, Reason, State2#state{modstate=UpdModState}} @@ -3998,6 +3993,64 @@ highest_actor(ActorBase, Obj) -> %% get the greatest event for the highest/latest actor {Actor, Epoch, riak_object:actor_counter(Actor, Obj)}. +%% +%% Technical note: The index_module configuration parameter should contain +%% a module name which must implement the following functions: +%% +%% - index(object_pair(), write_reason(), p()) -> ok. +%% - index_binary(bucket(), key(), binary(), write_reason(), p()) -> ok. +%% - is_searchable(riak_kv_bucket:props()) -> boolean(). +%% +%% The indexing module will be called on puts, deletes, handoff, and +%% anti-entropy activity. In the case of puts, if an object is being over-written, +%% the old object will be passed as the second parameter in the object pair. +%% The indexing module may use this old object to optimize the update (e.g., +%% to handle the special case of sibling writes, which may not map directly to +%% Riak puts). +%% +%% NB. Currently, yokozuna is the only repository that currently +%% implements this behavior. C.f., Yokozuna cuttlefish schema, to see +%% where this configuration is implicitly set. +%% + +-spec update_hook()-> update_hook(). +update_hook() -> + app_helper:get_env(riak_kv, update_hook). + +-spec maybe_update(update_hook(), + riak_kv_update_hook:object_pair(), + riak_kv_update_hook:update_reason(), + riak_kv_update_hook:partition()) -> ok. +maybe_update(undefined, _RObjPair, _Reason, _Idx) -> + ok; +maybe_update(UpdateHook, RObjPair, Reason, Idx) -> + UpdateHook:update(RObjPair, Reason, Idx). + +-spec maybe_update_binary(update_hook(), + riak_core_bucket:bucket(), + riak_object:key(), + binary(), + riak_kv_update_hook:update_reason(), + riak_kv_update_hook:partition()) -> ok. +maybe_update_binary(undefined, _Bucket, _Key, _Binary, _Reason, _Idx) -> + ok; +maybe_update_binary(UpdateHook, Bucket, Key, Binary, Reason, Idx) -> + UpdateHook:update_binary(Bucket, Key, Binary, Reason, Idx). + +-spec maybe_requires_existing_object(update_hook(), + riak_kv_bucket:props()) -> boolean(). +maybe_requires_existing_object(undefined, _BProps) -> + false; +maybe_requires_existing_object(UpdateHook, BProps) -> + UpdateHook:requires_existing_object(BProps). + +-spec maybe_should_handoff(update_hook(), + riak_kv_update_hook:handoff_dest()) -> boolean(). +maybe_should_handoff(undefined, _HandoffDest) -> + true; +maybe_should_handoff(UpdateHook, HandoffDest) -> + UpdateHook:should_handoff(HandoffDest). + -ifdef(TEST). -define(MGR, riak_kv_vnode_status_mgr).