diff --git a/apps/leo_storage/include/leo_storage.hrl b/apps/leo_storage/include/leo_storage.hrl index e5879e17..e800f59b 100644 --- a/apps/leo_storage/include/leo_storage.hrl +++ b/apps/leo_storage/include/leo_storage.hrl @@ -207,6 +207,35 @@ key :: any(), timestamp = 0 :: non_neg_integer(), times = 0 :: non_neg_integer()}). +-record(async_deletion_message_1, { + id = 0 :: non_neg_integer(), + addr_id = 0 :: non_neg_integer(), + key :: any(), + meta :: term(), + timestamp = 0 :: non_neg_integer(), + times = 0 :: non_neg_integer()}). +-define(MSG_ASYNC_DELETION, 'async_deletion_message_1'). +-define(transform_async_deletion_message(_Msg), + begin + case _Msg of + #async_deletion_message{id = _Id, + addr_id = _AddrId, + key = _Key, + timestamp = _Timestamp, + times = _Time} -> + {ok, #async_deletion_message_1{ + id = _Id, + addr_id = _AddrId, + key = _Key, + timestamp = _Timestamp, + times = _Time}}; + #async_deletion_message_1{} -> + {ok,_Msg}; + _ -> + {error, invalid_record} + end + end). + -record(recovery_node_message, { id = 0 :: non_neg_integer(), diff --git a/apps/leo_storage/src/leo_storage_mq.erl b/apps/leo_storage/src/leo_storage_mq.erl index fe1bae84..f83a03d2 100644 --- a/apps/leo_storage/src/leo_storage_mq.erl +++ b/apps/leo_storage/src/leo_storage_mq.erl @@ -116,11 +116,34 @@ publish(?QUEUE_ID_RECOVERY_NODE = Id, Node) -> node = Node, timestamp = leo_date:now()}), leo_mq_api:publish(Id, KeyBin, MsgBin); + +publish(?QUEUE_ID_ASYNC_DELETION = Id, #?METADATA{addr_id = AddrId, + key = Key} = Metadata) -> + KeyBin = term_to_binary({AddrId, Key}), + MessageBin = term_to_binary( + #?MSG_ASYNC_DELETION{id = leo_date:clock(), + addr_id = AddrId, + key = Key, + meta = Metadata, + timestamp = leo_date:now()}), + leo_mq_api:publish(Id, KeyBin, MessageBin); publish(_,_) -> {error, badarg}. -spec(publish(mq_id(), any(), any()) -> ok | {error, any()}). +publish(?QUEUE_ID_PER_OBJECT = Id, #?METADATA{addr_id = AddrId, + key = Key}= Metadata, ErrorType) -> + KeyBin = term_to_binary({ErrorType, Key}), + MessageBin = term_to_binary( + #?MSG_INCONSISTENT_DATA{id = leo_date:clock(), + type = ErrorType, + addr_id = AddrId, + key = Key, + meta = Metadata, + timestamp = leo_date:now()}), + leo_mq_api:publish(Id, KeyBin, MessageBin); + publish(?QUEUE_ID_SYNC_BY_VNODE_ID = Id, VNodeId, Node) -> KeyBin = term_to_binary({VNodeId, Node}), MessageBin = term_to_binary( @@ -131,13 +154,17 @@ publish(?QUEUE_ID_SYNC_BY_VNODE_ID = Id, VNodeId, Node) -> leo_mq_api:publish(Id, KeyBin, MessageBin); publish(?QUEUE_ID_ASYNC_DELETION = Id, AddrId, Key) -> - KeyBin = term_to_binary({AddrId, Key}), - MessageBin = term_to_binary( - #async_deletion_message{id = leo_date:clock(), - addr_id = AddrId, - key = Key, - timestamp = leo_date:now()}), - leo_mq_api:publish(Id, KeyBin, MessageBin); + case leo_storage_handler_object:head(AddrId, Key, false) of + {ok, Metadata} -> + publish(Id, Metadata); + not_found -> + ok; + {error, Cause} -> + ?error("publish/3", + [{addr_id, AddrId}, {key, Key}, + {cause, Cause}]), + {error, ?ERROR_COULD_NOT_GET_META} + end; publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, AddrId, Key) -> publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, undefined, AddrId, Key); @@ -158,7 +185,6 @@ publish(?QUEUE_ID_REQ_DEL_DIR = Id, Node, Directory) -> node = Node, dir = Directory, timestamp = leo_date:now()}), - ?debug("leo_mq delete directory", [{dir, Directory}]), leo_mq_api:publish(Id, KeyBin, MsgBin); publish({?QUEUE_ID_DEL_DIR, WorkerId}, AddrId, Key) -> @@ -176,14 +202,17 @@ publish(_,_,_) -> -spec(publish(mq_id(), any(), any(), any()) -> ok | {error, any()}). publish(?QUEUE_ID_PER_OBJECT = Id, AddrId, Key, ErrorType) -> - KeyBin = term_to_binary({ErrorType, Key}), - MessageBin = term_to_binary( - #?MSG_INCONSISTENT_DATA{id = leo_date:clock(), - type = ErrorType, - addr_id = AddrId, - key = Key, - timestamp = leo_date:now()}), - leo_mq_api:publish(Id, KeyBin, MessageBin); + case leo_storage_handler_object:head(AddrId, Key, false) of + {ok, Metadata} -> + publish(Id, Metadata, ErrorType); + not_found -> + ok; + {error, Cause} -> + ?error("publish/4", + [{addr_id, AddrId}, {key, Key}, + {cause, Cause}]), + {error, ?ERROR_COULD_NOT_GET_META} + end; publish(?QUEUE_ID_SYNC_OBJ_WITH_DC = Id, ClusterId, AddrId, Key) -> KeyBin = term_to_binary({ClusterId, AddrId, Key}), @@ -242,16 +271,27 @@ publish(_,_,_,_,_) -> {error, badarg}. publish(?QUEUE_ID_PER_OBJECT = Id, AddrId, Key, SyncNode, IsForceSync, ErrorType) -> - KeyBin = term_to_binary({ErrorType, Key}), - MessageBin = term_to_binary( - #?MSG_INCONSISTENT_DATA{id = leo_date:clock(), - type = ErrorType, - addr_id = AddrId, - key = Key, - sync_node = SyncNode, - is_force_sync = IsForceSync, - timestamp = leo_date:now()}), - leo_mq_api:publish(Id, KeyBin, MessageBin); + case leo_storage_handler_object:head(AddrId, Key, false) of + {ok, Metadata} -> + KeyBin = term_to_binary({ErrorType, Key}), + MessageBin = term_to_binary( + #?MSG_INCONSISTENT_DATA{id = leo_date:clock(), + type = ErrorType, + addr_id = AddrId, + key = Key, + meta = Metadata, + sync_node = SyncNode, + is_force_sync = IsForceSync, + timestamp = leo_date:now()}), + leo_mq_api:publish(Id, KeyBin, MessageBin); + not_found -> + ok; + {error, Cause} -> + ?error("publish/4", + [{addr_id, AddrId}, {key, Key}, + {cause, Cause}]), + {error, ?ERROR_COULD_NOT_GET_META} + end; publish(_,_,_,_,_,_) -> {error, badarg}. @@ -280,25 +320,32 @@ handle_call({consume, ?QUEUE_ID_PER_OBJECT, MessageBin}) -> case ?transform_inconsistent_data_message(Term) of {ok, #?MSG_INCONSISTENT_DATA{addr_id = AddrId, key = Key, + meta = Metadata, sync_node = SyncNode, - is_force_sync = true}} when SyncNode /= undefined -> - send_object_to_remote_node(SyncNode, AddrId, Key); - {ok, #?MSG_INCONSISTENT_DATA{addr_id = AddrId, - key = Key, - type = _ErrorType}} -> - case correct_redundancies(Key) of - ok -> - ok; - {error, Cause = not_found} -> - ?warn("handle_call/1 - consume", - [{qid, ?QUEUE_ID_PER_OBJECT}, - {addr_id, AddrId}, - {key, Key}, {cause, Cause}]), + is_force_sync = IsForceSync}} -> + case compare_metadatas(Metadata) of + true when IsForceSync == true, + SyncNode /= undefined -> + send_object_to_remote_node(SyncNode, AddrId, Key); + true -> + case correct_redundancies(Key) of + ok -> + ok; + {error, Cause = not_found} -> + ?warn("handle_call/1 - consume", + [{qid, ?QUEUE_ID_PER_OBJECT}, + {addr_id, AddrId}, + {key, Key}, {cause, Cause}]), + ok; + {error, Cause} -> + ?debug("handle_call/1 - consume", + [{addr_id, AddrId}, + {key, Key}, {cause, Cause}]), + {error, Cause} + end; + false -> ok; {error, Cause} -> - ?debug("handle_call/1 - consume", - [{addr_id, AddrId}, - {key, Key}, {cause, Cause}]), {error, Cause} end; {error,_Error} -> @@ -497,6 +544,11 @@ recover_node_callback_2([SrcNode|Rest], AddrId, Key, FixedNode) -> %% @doc Send object to a remote-node %% @private +-spec(send_object_to_remote_node(Node, AddrId, Key) -> + ok | {error, Cause} when Node::node(), + AddrId::non_neg_integer(), + Key::binary(), + Cause::any()). send_object_to_remote_node(Node, AddrId, Key) -> Ref = make_ref(), case leo_storage_handler_object:get({Ref, Key}) of @@ -575,7 +627,6 @@ sync_vnodes_callback(Node, FromAddrId, ToAddrId)-> %% @doc Remove a node from redundancies %% @private - -spec(delete_node_from_redundancies(Redundancies, Node, AccRedundancies) -> {ok, AccRedundancies} when Redundancies::[#redundant_node{}], Node::node(), @@ -842,6 +893,10 @@ rebalance_2({ok, Redundancies}, #rebalance_message{node = Node, %% @doc Retrieve redundancies with a number of replicas %% @private +-spec(get_redundancies_with_replicas(AddrId, Key, Redundancies) -> + Redundancies when AddrId::non_neg_integer(), + Key::binary(), + Redundancies::[#redundancies{}]). get_redundancies_with_replicas(AddrId, Key, Redundancies) -> %% Retrieve redundancies with a number of replicas case leo_object_storage_api:head({AddrId, Key}) of @@ -897,6 +952,9 @@ notify_rebalance_message_to_manager(VNodeId) -> %% @doc Fix consistency of an object between a local-cluster and remote-cluster(s) %% @private +-spec(fix_consistency_between_clusters(InconsistentData) -> + ok | {error, Cause} when InconsistentData::#inconsistent_data_with_dc{}, + Cause::any()). fix_consistency_between_clusters(#inconsistent_data_with_dc{ addr_id = AddrId, key = Key, @@ -923,34 +981,77 @@ fix_consistency_between_clusters(#inconsistent_data_with_dc{ %% @doc Remove objects under the directory/bucket +-spec(remove_objects_under_dir(MessageBin) -> + ok | {error, Cause} when MessageBin::binary(), + Cause::any()). remove_objects_under_dir(MessageBin) -> case catch binary_to_term(MessageBin) of - #async_deletion_message{addr_id = AddrId, - key = Key} -> - case catch leo_storage_handler_object:delete( - #?OBJECT{addr_id = AddrId, - key = Key, - clock = leo_date:clock(), - timestamp = leo_date:now(), - del = ?DEL_TRUE - }, 0, false, leo_mq) of - ok -> - ok; - {error, not_found} -> - ok; - {_, Cause} -> - {error, Cause} - end; - _ -> + {'EXIT',_Cause} -> ?warn("remove_objects_under_dir/1", [{qid, ?QUEUE_ID_DEL_DIR}, {cause, invalid_data_format}]), - ok + ok; + AsyncDeletionMsg -> + case ?transform_async_deletion_message(AsyncDeletionMsg) of + {ok, #?MSG_ASYNC_DELETION{addr_id = AddrId, + key = Key, + meta = Metadata}} -> + case compare_metadatas(Metadata) of + true -> + case catch leo_storage_handler_object:delete( + #?OBJECT{addr_id = AddrId, + key = Key, + clock = leo_date:clock(), + timestamp = leo_date:now(), + del = ?DEL_TRUE}, 0, false, leo_mq) of + ok -> + ok; + {error, not_found} -> + ok; + {_, Cause} -> + {error, Cause} + end; + false -> + ok; + {error, Cause} -> + {error, Cause} + end; + _ -> + ?warn("remove_objects_under_dir/1", + [{qid, ?QUEUE_ID_DEL_DIR}, + {cause, invalid_data_format}]), + ok + end + end. + + +%% @doc Compare MQ's metadata with its latest metadata +-spec(compare_metadatas(Metadata) -> + Result | {error, Cause} when Metadata::#?METADATA{}, + Result::boolean(), + Cause::any()). +compare_metadatas(undefined) -> + true; +compare_metadatas(#?METADATA{addr_id = AddrId, + key = Key, + clock = MsgClock}) -> + case leo_storage_handler_object:head(AddrId, Key, false) of + {ok, #?METADATA{clock = Clock}} when MsgClock > Clock -> + true; + {ok, _} -> + false; + not_found -> + false; + {error, Cause} -> + ?error("compare_metadatas/1", + [{addr_id, AddrId}, {key, Key}, + {cause, Cause}]), + {error, ?ERROR_COULD_NOT_GET_META} end. %%-------------------------------------------------------------------- -%% INNTERNAL FUNCTIONS-2 +%% INNTERNAL FUNCTIONS-2 ETS related functions %%-------------------------------------------------------------------- %% @doc Lookup rebalance counter %% @private @@ -972,12 +1073,18 @@ ets_lookup(Table, Key) -> %% @doc Increment rebalance counter %% @private +-spec(increment_counter(Table, Key) -> + ok when Table::atom(), + Key::binary()). increment_counter(Table, Key) -> catch ets:update_counter(Table, Key, 1), ok. %% @doc Decrement rebalance counter %% @private +-spec(decrement_counter(Table, Key) -> + ok when Table::atom(), + Key::binary()). decrement_counter(Table, Key) -> catch ets:update_counter(Table, Key, -1), ok. diff --git a/apps/leo_storage/src/leo_storage_read_repairer.erl b/apps/leo_storage/src/leo_storage_read_repairer.erl index eb727d44..642cb716 100644 --- a/apps/leo_storage/src/leo_storage_read_repairer.erl +++ b/apps/leo_storage/src/leo_storage_read_repairer.erl @@ -138,9 +138,11 @@ loop(R, Ref, From, NumOfNodes, {ReqId, Key, E} = Args, Callback) -> RPCKey::rpc:key(), Node::atom(), State::#state{}). -compare(Ref, Pid, RPCKey, Node, #state{metadata = #?METADATA{addr_id = AddrId, - key = Key, - clock = Clock}}) -> +compare(Ref, Pid, RPCKey, Node, #state{metadata = Metadata}) -> + #?METADATA{addr_id = AddrId, + key = Key, + clock = Clock} = Metadata, + Ret = case rpc:nb_yield(RPCKey, ?DEF_REQ_TIMEOUT) of {value, {ok, #?METADATA{clock = RemoteClock}}} when Clock == RemoteClock -> ok; @@ -165,22 +167,24 @@ compare(Ref, Pid, RPCKey, Node, #state{metadata = #?METADATA{addr_id = AddrId, ?warn("compare/4", [{node, Node}, {addr_id, AddrId}, {key, Key}, {clock, Clock}, {cause, Reason}]), - enqueue(AddrId, Key) + %% enqueue(AddrId, Key) + enqueue(Metadata) end, erlang:send(Pid, {Ref, Ret}). %% @doc Insert a message into the queue %% @private --spec(enqueue(AddrId, Key) -> - ok | {error, any()} when AddrId::non_neg_integer(), - Key::binary()). -enqueue(AddrId, Key) -> +-spec(enqueue(Metadata) -> + ok | {error, any()} when Metadata::#?METADATA{}). +enqueue(Metadata) -> QId = ?QUEUE_ID_PER_OBJECT, - case leo_storage_mq:publish(QId, AddrId, Key, ?ERR_TYPE_RECOVER_DATA) of + case leo_storage_mq:publish(QId, Metadata, ?ERR_TYPE_RECOVER_DATA) of ok -> void; {error, Cause} -> + #?METADATA{addr_id = AddrId, + key = Key} = Metadata, ?warn("enqueue/1", [{qid, QId}, {addr_id, AddrId}, {key, Key}, {cause, Cause}]) diff --git a/apps/leo_storage/src/leo_storage_replicator.erl b/apps/leo_storage/src/leo_storage_replicator.erl index 5984d9f5..2190b96f 100644 --- a/apps/leo_storage/src/leo_storage_replicator.erl +++ b/apps/leo_storage/src/leo_storage_replicator.erl @@ -104,9 +104,9 @@ init_loop(NumOfNodes, Quorum, Ref, Parent, State) -> loop(NumOfNodes, Quorum, [], Ref, Parent, State). %% @private -replicate_1([], Ref,_From, #state{method = Method, - addr_id = AddrId, - key = Key, +replicate_1([], Ref,_From, #state{method = Method, + key = Key, + object = Object, callback = Callback}) -> receive %% Receive only messages sent from SubParent (_From in this context) @@ -115,7 +115,9 @@ replicate_1([], Ref,_From, #state{method = Method, after (?DEF_REQ_TIMEOUT + timer:seconds(1)) -> %% for recovering message of the repair-obj's MQ - enqueue(Method, ?ERR_TYPE_REPLICATE_DATA, AddrId, Key), + Metadata = leo_object_storage_transformer:object_to_metadata(Object), + enqueue(Method, ?ERR_TYPE_REPLICATE_DATA, Metadata), + %% for watchdog ok = leo_storage_msg_collector:notify(?ERROR_MSG_TIMEOUT, Method, Key), %% reply error @@ -168,8 +170,8 @@ loop(_, W,_ResL, Ref, From, #state{num_of_nodes = N, %% This message is sent to Parent so need to add a tag erlang:send(From, {?LEO_STORAGE_REPLICATOR_MSG_TAG, Ref, {error, E}}); loop(N, W, ResL, Ref, From, #state{method = Method, - addr_id = AddrId, key = Key, + object = Object, errors = E, callback = Callback, is_reply = IsReply} = State) -> @@ -202,7 +204,9 @@ loop(N, W, ResL, Ref, From, #state{method = Method, end, loop(N-1, W_1, [0|ResL], Ref, From, State_1); {Ref, {error, {Node, Cause}}} -> - enqueue(Method, ?ERR_TYPE_REPLICATE_DATA, AddrId, Key), + Metadata = leo_object_storage_transformer:object_to_metadata(Object), + enqueue(Method, ?ERR_TYPE_REPLICATE_DATA, Metadata), + State_1 = State#state{errors = [{Node, Cause}|E]}, loop(N-1, W, ResL, Ref, From, State_1) after @@ -210,7 +214,9 @@ loop(N, W, ResL, Ref, From, #state{method = Method, case (W > 0) of true -> %% for recovering message of the repair-obj's MQ - enqueue(Method, ?ERR_TYPE_REPLICATE_DATA, AddrId, Key), + Metadata = leo_object_storage_transformer:object_to_metadata(Object), + enqueue(Method, ?ERR_TYPE_REPLICATE_DATA, Metadata), + %% set reply Cause = timeout, ?warn("loop/6", @@ -250,28 +256,30 @@ replicate_fun(Ref, #req_params{pid = Pid, %% @doc Input a message into the queue. -%% --spec(enqueue(Method, Type, AddrId, Key) -> +-spec(enqueue(Method, Type, Metadata) -> ok when Method::type_of_method(), Type::error_msg_type(), - AddrId::non_neg_integer(), - Key::binary()). -enqueue('put', ?ERR_TYPE_REPLICATE_DATA = Type, AddrId, Key) -> + Metadata::#?METADATA{}). +enqueue('put', ?ERR_TYPE_REPLICATE_DATA = Type, Metadata) -> QId = ?QUEUE_ID_PER_OBJECT, - case leo_storage_mq:publish(QId, AddrId, Key, Type) of + case leo_storage_mq:publish(QId, Metadata, Type) of ok -> ok; {error, Cause} -> + #?METADATA{addr_id = AddrId, + key = Key} = Metadata, ?warn("enqueue/1", [{qid, QId}, {addr_id, AddrId}, {key, Key}, {type, Type}, {cause, Cause}]) end; -enqueue('delete', _Type, AddrId, Key) -> +enqueue('delete', _Type, Metadata) -> QId = ?QUEUE_ID_ASYNC_DELETION, - case leo_storage_mq:publish(QId, AddrId, Key) of + case leo_storage_mq:publish(QId, Metadata) of ok -> ok; {error, Cause} -> + #?METADATA{addr_id = AddrId, + key = Key} = Metadata, ?warn("enqueue/1", [{qid, QId}, {addr_id, AddrId}, {key, Key}, {cause, Cause}]) diff --git a/apps/leo_storage/src/leo_sync_local_cluster.erl b/apps/leo_storage/src/leo_sync_local_cluster.erl index 7540190c..9d10d389 100644 --- a/apps/leo_storage/src/leo_sync_local_cluster.erl +++ b/apps/leo_storage/src/leo_sync_local_cluster.erl @@ -254,9 +254,8 @@ slice_and_replicate_1(#?METADATA{addr_id = AddrId, case leo_storage_handler_object:head(AddrId, Key, false) of {ok, #?METADATA{clock = Clock_1}} when Clock == Clock_1 -> slice_and_replicate(StackedObject, Errors); - {ok, #?METADATA{clock = Clock_1}} when Clock < Clock_1 -> - ok = leo_storage_mq:publish(?QUEUE_ID_PER_OBJECT, - AddrId, Key, ?ERR_TYPE_REPLICATE_DATA), + {ok, #?METADATA{clock = Clock_1} = Metadata} when Clock < Clock_1 -> + ok = leo_storage_mq:publish(?QUEUE_ID_PER_OBJECT, Metadata, ?ERR_TYPE_REPLICATE_DATA), slice_and_replicate(StackedObject, Errors); _ -> case leo_misc:get_env(leo_redundant_manager, ?PROP_RING_HASH) of diff --git a/apps/leo_storage/test/leo_storage_mq_tests.erl b/apps/leo_storage/test/leo_storage_mq_tests.erl index 5b13a095..4239d4d2 100644 --- a/apps/leo_storage/test/leo_storage_mq_tests.erl +++ b/apps/leo_storage/test/leo_storage_mq_tests.erl @@ -177,12 +177,31 @@ start_(_) -> %% sync vnode-id queue. publish_({_, Test1Node}) -> ?TBL_REBALANCE_COUNTER = ets:new(?TBL_REBALANCE_COUNTER, [named_table, public]), + meck:new(leo_object_storage_api, [non_strict]), + meck:expect(leo_object_storage_api, head, + fun({_AddrId, _Key}) -> + {ok, term_to_binary(#?METADATA{num_of_replicas = 0})} + end), + + ok = leo_storage_mq:publish(?QUEUE_ID_RECOVERY_NODE, node()), + ok = leo_storage_mq:publish(?QUEUE_ID_ASYNC_DELETION, #?METADATA{addr_id = ?TEST_VNODE_ID, + key = ?TEST_KEY_1}), + ok = leo_storage_mq:publish(?QUEUE_ID_ASYNC_DELETION, ?TEST_VNODE_ID, ?TEST_KEY_1), + + ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_BY_VNODE_ID, ?TEST_VNODE_ID, node()), + ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, ?TEST_VNODE_ID, ?TEST_KEY_1), + ok = leo_storage_mq:publish(?QUEUE_ID_COMP_META_WITH_DC, 'leofs_1', [{?TEST_VNODE_ID, ?TEST_KEY_1}]), + ok = leo_storage_mq:publish(?QUEUE_ID_REQ_DEL_DIR, node(), <<"leofs/">>), + ok = leo_storage_mq:publish({?QUEUE_ID_DEL_DIR, 'worker_1'}, ?TEST_VNODE_ID, ?TEST_KEY_1), + ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, 'leofs_1', ?TEST_VNODE_ID, ?TEST_KEY_1), ok = leo_storage_mq:publish( ?QUEUE_ID_SYNC_BY_VNODE_ID, ?TEST_VNODE_ID, Test1Node), ok = leo_storage_mq:publish( ?QUEUE_ID_REBALANCE, Test1Node, ?TEST_VNODE_ID, ?TEST_VNODE_ID, ?TEST_KEY_1), - + ok = leo_storage_mq:publish( + ?QUEUE_ID_PER_OBJECT, #?METADATA{addr_id = ?TEST_VNODE_ID, + key = ?TEST_KEY_1}, ?ERR_TYPE_REPLICATE_DATA), ok = leo_storage_mq:publish( ?QUEUE_ID_PER_OBJECT, ?TEST_VNODE_ID, ?TEST_KEY_1, ?ERR_TYPE_REPLICATE_DATA), ok = leo_storage_mq:publish( diff --git a/apps/leo_storage/test/leo_storage_read_repairer_tests.erl b/apps/leo_storage/test/leo_storage_read_repairer_tests.erl index b066ed7d..d0965ff8 100644 --- a/apps/leo_storage/test/leo_storage_read_repairer_tests.erl +++ b/apps/leo_storage/test/leo_storage_read_repairer_tests.erl @@ -92,7 +92,7 @@ regular_({Test0Node, Test1Node}) -> end]), meck:new(leo_storage_mq, [non_strict]), meck:expect(leo_storage_mq, publish, - fun(_,_,_,_) -> + fun(_,_,_) -> ok end), @@ -115,7 +115,7 @@ fail_1_({Test0Node, Test1Node}) -> end]), meck:new(leo_storage_mq, [non_strict]), meck:expect(leo_storage_mq, publish, - fun(_,_,_,_) -> + fun(_,_,_) -> ok end), @@ -138,7 +138,7 @@ fail_2_({Test0Node, Test1Node}) -> end]), meck:new(leo_storage_mq, [non_strict]), meck:expect(leo_storage_mq, publish, - fun(_,_,_,_) -> + fun(_,_,_) -> ok end), diff --git a/apps/leo_storage/test/leo_storage_replicator_tests.erl b/apps/leo_storage/test/leo_storage_replicator_tests.erl index 305ec738..03089604 100644 --- a/apps/leo_storage/test/leo_storage_replicator_tests.erl +++ b/apps/leo_storage/test/leo_storage_replicator_tests.erl @@ -155,10 +155,11 @@ replicate_obj_2_({Test0Node, Test1Node}) -> gen_mock_2(object, {_Test0Node, _Test1Node}, Case) -> meck:new(leo_storage_mq, [non_strict]), meck:expect(leo_storage_mq, publish, - fun(Type, VNodeId, Key, _ErrorType) -> + fun(Type, #?METADATA{addr_id = AddrId, + key = Key} = Metadata, _ErrorType) -> ?assertEqual(?QUEUE_ID_PER_OBJECT, Type), - ?assertEqual(?TEST_RING_ID_1, VNodeId), - ?assertEqual(?TEST_KEY_1, Key), + ?assertEqual(?TEST_RING_ID_1, AddrId), + ?assertEqual(?TEST_KEY_1, Key), ok end),