diff --git a/apps/leo_storage/include/leo_storage.hrl b/apps/leo_storage/include/leo_storage.hrl index e800f59b..bbe19f3b 100644 --- a/apps/leo_storage/include/leo_storage.hrl +++ b/apps/leo_storage/include/leo_storage.hrl @@ -835,6 +835,7 @@ directory = <<>> :: binary(), state = ?STATE_PENDING :: del_dir_state(), is_notification_successful = false :: boolean(), %% for del-buclet (communication w/leo_manager) + enqueued_at = 1 :: pos_integer(), timestamp = 1 :: pos_integer() }). diff --git a/apps/leo_storage/src/leo_storage_handler_del_directory.erl b/apps/leo_storage/src/leo_storage_handler_del_directory.erl index f6dde371..4aa8fe05 100644 --- a/apps/leo_storage/src/leo_storage_handler_del_directory.erl +++ b/apps/leo_storage/src/leo_storage_handler_del_directory.erl @@ -134,7 +134,9 @@ handle_call({enqueue, Type, Directory, IsClient},_From, #state{cached_items = Ca directory = Directory, type = Type, state = ?STATE_PENDING, - timestamp = leo_date:now()}) of + enqueued_at = leo_date:clock(), + timestamp = leo_date:now() + }) of ok when IsClient == false-> case run(Type, ?STATE_PENDING, null, Directory, State) of {ok, State_1} -> @@ -144,7 +146,7 @@ handle_call({enqueue, Type, Directory, IsClient},_From, #state{cached_items = Ca {error, State} end; ok -> - ?info("run/5", [{"msg: enqueued", Directory}]), + ?info("handle_call/3 - enqueue", [{"msg: enqueued", Directory}]), {ok, State}; {error,_Cause} -> {{error, ?ERROR_ENQUEUE_FAILURE}, State} @@ -290,11 +292,12 @@ dequeue_1(State, [{_, DelBucketStateBin}|DelBucketStateList], MQId) -> case catch binary_to_term(DelBucketStateBin) of #del_dir_state{type = Type, directory = Directory, - state = ?STATE_PENDING} -> + state = ?STATE_PENDING, + enqueued_at = EnqueuedAt} -> From = self(), _Pid = spawn( fun() -> - insert_messages(From, MQId, Type, Directory) + insert_messages(From, MQId, Type, Directory, EnqueuedAt) end), % receive 'enqueuing' message here to change the state from STATE_PENDING to STATE_ENQUEUING % for preventing insert_messages from being invoked multiple times. @@ -312,17 +315,18 @@ dequeue_1(State, [{_, DelBucketStateBin}|DelBucketStateList], MQId) -> %% @private --spec(insert_messages(From, MQId, Type, Directory) -> +-spec(insert_messages(From, MQId, Type, Directory, EnqueuedAt) -> ok | {error, Cause} when From::pid(), MQId::mq_id(), Type::del_dir_type(), Directory::binary(), + EnqueuedAt::pos_integer(), Cause::any()). -insert_messages(From, MQId, Type, Directory) -> +insert_messages(From, MQId, Type, Directory, EnqueuedAt) -> erlang:send(From, {enqueuing, MQId, Type, Directory}), case leo_storage_handler_object:prefix_search_and_remove_objects( - MQId, Directory) of + MQId, Directory, EnqueuedAt) of {ok,_TotalMsgs} -> erlang:send(From, {enqueued, MQId, Type, Directory}); {error, Cause} -> @@ -396,22 +400,20 @@ check_stats_1([_|DelBucketStateList], From, State) -> check_stats_2(#del_dir_state{mq_id = MQId, type = Type, directory = Directory, - state = ?STATE_PENDING}, From,_State) when MQId /= null, - Type == ?TYPE_DEL_DIR -> + state = ?STATE_PENDING, + enqueued_at = EnqueuedAt} = CurState, From,_State) when MQId /= null, + Type == ?TYPE_DEL_DIR -> case leo_mq_api:count(MQId) of {ok, 0} -> _Pid = spawn( fun() -> - insert_messages(From, MQId, Type, Directory) + insert_messages(From, MQId, Type, Directory, EnqueuedAt) end), ok; {ok,_Count} -> - update_state(#del_dir_state{ - mq_id = MQId, - directory = Directory, - type = Type, - state = ?STATE_MONITORING, - timestamp = leo_date:now()}); + update_state(CurState#del_dir_state{ + state = ?STATE_MONITORING, + timestamp = leo_date:now()}); _ -> ok end; diff --git a/apps/leo_storage/src/leo_storage_handler_object.erl b/apps/leo_storage/src/leo_storage_handler_object.erl index 0ab104c1..7babec6b 100644 --- a/apps/leo_storage/src/leo_storage_handler_object.erl +++ b/apps/leo_storage/src/leo_storage_handler_object.erl @@ -42,7 +42,7 @@ head/2, head/3, head_with_calc_md5/3, replicate/1, replicate/3, - prefix_search/3, prefix_search_and_remove_objects/2, + prefix_search/3, prefix_search_and_remove_objects/3, find_uploaded_objects_by_key/1, is_key_under_del_dir/1, can_compact_object/2 ]). @@ -1032,20 +1032,21 @@ prefix_search_2(ParentDir, Marker, Key, Meta, Acc) -> %% @doc Retrieve object of deletion from object-storage by key --spec(prefix_search_and_remove_objects(MQId, ParentDir) -> +-spec(prefix_search_and_remove_objects(MQId, ParentDir, EnqueuedAt) -> {ok, [{Key, Value}]} | not_found | {error, Cause} when MQId::mq_id(), ParentDir::binary(), + EnqueuedAt::pos_integer(), Key::atom(), Value::any(), Cause::any()). -prefix_search_and_remove_objects(MQId, ParentDir) -> +prefix_search_and_remove_objects(MQId, ParentDir, EnqueuedAt) -> Fun = fun(Key, V, Acc) -> PreMeta = binary_to_term(V), case leo_object_storage_transformer:transform_metadata(PreMeta) of {error, Cause} -> - ?error("prefix_search_and_remove_objects/1", + ?error("prefix_search_and_remove_objects/3", [{key, Key}, {error, Cause}]), Acc; Metadata -> @@ -1059,7 +1060,7 @@ prefix_search_and_remove_objects(MQId, ParentDir) -> case (Pos_1 == 0) of true when Metadata#?METADATA.del == ?DEL_FALSE -> case leo_storage_mq:publish( - {?QUEUE_ID_DEL_DIR, MQId}, AddrId, Key) of + {?QUEUE_ID_DEL_DIR, MQId}, AddrId, Key, EnqueuedAt) of ok -> void; {error, Cause} -> diff --git a/apps/leo_storage/src/leo_storage_mq.erl b/apps/leo_storage/src/leo_storage_mq.erl index f83a03d2..8ac5fb67 100644 --- a/apps/leo_storage/src/leo_storage_mq.erl +++ b/apps/leo_storage/src/leo_storage_mq.erl @@ -186,16 +186,6 @@ publish(?QUEUE_ID_REQ_DEL_DIR = Id, Node, Directory) -> dir = Directory, timestamp = leo_date:now()}), leo_mq_api:publish(Id, KeyBin, MsgBin); - -publish({?QUEUE_ID_DEL_DIR, WorkerId}, AddrId, Key) -> - KeyBin = term_to_binary({AddrId, Key}), - MsgBin = term_to_binary(#async_deletion_message{ - id = leo_date:clock(), - addr_id = AddrId, - key = Key, - timestamp = leo_date:now()}), - leo_mq_api:publish(WorkerId, KeyBin, MsgBin); - publish(_,_,_) -> {error, badarg}. @@ -225,6 +215,20 @@ publish(?QUEUE_ID_SYNC_OBJ_WITH_DC = Id, ClusterId, AddrId, Key) -> timestamp = leo_date:now()}), leo_mq_api:publish(Id, KeyBin, MessageBin); +publish({?QUEUE_ID_DEL_DIR, WorkerId}, AddrId, Key, EnqueuedAt) -> + KeyBin = term_to_binary({AddrId, Key}), + MsgBin = term_to_binary(#?MSG_ASYNC_DELETION{ + id = leo_date:clock(), + addr_id = AddrId, + key = Key, + %% To compare it with its latest metadata + %% See `compare_between_metadatas/1` + meta = #?METADATA{addr_id = AddrId, + key = Key, + clock = EnqueuedAt}, + timestamp = leo_date:now()}), + + leo_mq_api:publish(WorkerId, KeyBin, MsgBin); publish(_,_,_,_) -> {error, badarg}. @@ -323,7 +327,7 @@ handle_call({consume, ?QUEUE_ID_PER_OBJECT, MessageBin}) -> meta = Metadata, sync_node = SyncNode, is_force_sync = IsForceSync}} -> - case compare_metadatas(Metadata) of + case compare_between_metadatas(Metadata) of true when IsForceSync == true, SyncNode /= undefined -> send_object_to_remote_node(SyncNode, AddrId, Key); @@ -996,7 +1000,7 @@ remove_objects_under_dir(MessageBin) -> {ok, #?MSG_ASYNC_DELETION{addr_id = AddrId, key = Key, meta = Metadata}} -> - case compare_metadatas(Metadata) of + case compare_between_metadatas(Metadata) of true -> case catch leo_storage_handler_object:delete( #?OBJECT{addr_id = AddrId, @@ -1026,15 +1030,15 @@ remove_objects_under_dir(MessageBin) -> %% @doc Compare MQ's metadata with its latest metadata --spec(compare_metadatas(Metadata) -> +-spec(compare_between_metadatas(Metadata) -> Result | {error, Cause} when Metadata::#?METADATA{}, Result::boolean(), Cause::any()). -compare_metadatas(undefined) -> +compare_between_metadatas(undefined) -> true; -compare_metadatas(#?METADATA{addr_id = AddrId, - key = Key, - clock = MsgClock}) -> +compare_between_metadatas(#?METADATA{addr_id = AddrId, + key = Key, + clock = MsgClock}) -> case leo_storage_handler_object:head(AddrId, Key, false) of {ok, #?METADATA{clock = Clock}} when MsgClock > Clock -> true; @@ -1043,7 +1047,7 @@ compare_metadatas(#?METADATA{addr_id = AddrId, not_found -> false; {error, Cause} -> - ?error("compare_metadatas/1", + ?error("compare_between_metadatas/1", [{addr_id, AddrId}, {key, Key}, {cause, Cause}]), {error, ?ERROR_COULD_NOT_GET_META} diff --git a/apps/leo_storage/test/leo_storage_handler_object_tests.erl b/apps/leo_storage/test/leo_storage_handler_object_tests.erl index 8708e06f..6a295c1f 100644 --- a/apps/leo_storage/test/leo_storage_handler_object_tests.erl +++ b/apps/leo_storage/test/leo_storage_handler_object_tests.erl @@ -502,8 +502,10 @@ prefix_search_and_remove_objects_(_) -> meck:new(leo_mq_api, [non_strict]), meck:expect(leo_mq_api, publish, fun(_,_,_) -> ok end), + meck:expect(leo_mq_api, publish, fun(_,_,_,_) -> ok end), - Res = leo_storage_handler_object:prefix_search_and_remove_objects(mqid, ?TEST_BUCKET), + Res = leo_storage_handler_object:prefix_search_and_remove_objects( + mqid, ?TEST_BUCKET, leo_date:clock()), ?assertEqual(true, is_list(Res)), ok. diff --git a/apps/leo_storage/test/leo_storage_mq_tests.erl b/apps/leo_storage/test/leo_storage_mq_tests.erl index 4239d4d2..8435d294 100644 --- a/apps/leo_storage/test/leo_storage_mq_tests.erl +++ b/apps/leo_storage/test/leo_storage_mq_tests.erl @@ -192,7 +192,7 @@ publish_({_, Test1Node}) -> ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, ?TEST_VNODE_ID, ?TEST_KEY_1), ok = leo_storage_mq:publish(?QUEUE_ID_COMP_META_WITH_DC, 'leofs_1', [{?TEST_VNODE_ID, ?TEST_KEY_1}]), ok = leo_storage_mq:publish(?QUEUE_ID_REQ_DEL_DIR, node(), <<"leofs/">>), - ok = leo_storage_mq:publish({?QUEUE_ID_DEL_DIR, 'worker_1'}, ?TEST_VNODE_ID, ?TEST_KEY_1), + ok = leo_storage_mq:publish({?QUEUE_ID_DEL_DIR, 'worker_1'}, ?TEST_VNODE_ID, ?TEST_KEY_1, leo_date:clock()), ok = leo_storage_mq:publish(?QUEUE_ID_SYNC_OBJ_WITH_DC, 'leofs_1', ?TEST_VNODE_ID, ?TEST_KEY_1), ok = leo_storage_mq:publish( diff --git a/apps/leo_storage/test/leo_storage_replicator_tests.erl b/apps/leo_storage/test/leo_storage_replicator_tests.erl index 03089604..984b2a14 100644 --- a/apps/leo_storage/test/leo_storage_replicator_tests.erl +++ b/apps/leo_storage/test/leo_storage_replicator_tests.erl @@ -156,7 +156,7 @@ gen_mock_2(object, {_Test0Node, _Test1Node}, Case) -> meck:new(leo_storage_mq, [non_strict]), meck:expect(leo_storage_mq, publish, fun(Type, #?METADATA{addr_id = AddrId, - key = Key} = Metadata, _ErrorType) -> + key = Key}, _ErrorType) -> ?assertEqual(?QUEUE_ID_PER_OBJECT, Type), ?assertEqual(?TEST_RING_ID_1, AddrId), ?assertEqual(?TEST_KEY_1, Key),