Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Jul 28, 2014
1 parent 7822086 commit 2e79e71
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 85 deletions.
70 changes: 25 additions & 45 deletions src/leo_object_storage_haystack.erl
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,9 @@ put_fun_2(MetaDBId, StorageInfo, #?OBJECT{key = Key,
data = Bin,
checksum = Checksum} = Object) ->
Checksum_1 = case Checksum of
0 -> leo_hex:raw_binary_to_integer(crypto:hash(md5, Bin));
_ -> Checksum
end,
0 -> leo_hex:raw_binary_to_integer(crypto:hash(md5, Bin));
_ -> Checksum
end,
Object_1 = Object#?OBJECT{ksize = byte_size(Key),
checksum = Checksum_1},
Needle = create_needle(Object_1),
Expand Down Expand Up @@ -596,12 +596,7 @@ compact_get(ReadHandler, Offset) ->
HeaderSize ->
compact_get(ReadHandler, Offset, HeaderSize, HeaderBin);
_ ->
Cause = ?ERROR_DATA_SIZE_DID_NOT_MATCH,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get/2"},
{line, ?LINE}, {body, Cause}]),
{error, Cause}
{error, ?ERROR_DATA_SIZE_DID_NOT_MATCH}
end;
eof = Cause ->
{error, Cause};
Expand Down Expand Up @@ -640,13 +635,7 @@ compact_get(#?METADATA{ksize = KSize,

case (RemainSize > ?MAX_DATABLOCK_SIZE) of
true ->
Cause = ?ERROR_INVALID_DATA,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get/4"},
{line, ?LINE},
{body, "Data size too large"}]),
{error, Cause};
{error, ?ERROR_INVALID_DATA};
false ->
try
case file:pread(ReadHandler, Offset + HeaderSize, RemainSize) of
Expand All @@ -657,23 +646,11 @@ compact_get(#?METADATA{ksize = KSize,
DSize4Read, RemainBin,
Offset + HeaderSize + RemainSize);
_ ->
Cause = ?ERROR_DATA_SIZE_DID_NOT_MATCH,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get/4"},
{line, ?LINE},
{body, Cause}]),
{error, Cause}
{error, ?ERROR_DATA_SIZE_DID_NOT_MATCH}
end;


eof = Cause ->
{error, Cause};
{error, Cause} ->
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get/4"},
{line, ?LINE}, {body, Cause}]),
{error, Cause}
end
catch
Expand Down Expand Up @@ -704,20 +681,23 @@ compact_get_1(HeaderBin, #?METADATA{ksize = KSize,
%% @private
compact_get_2(HeaderBin, Metadata, KeyBin, BodyBin, CMetaBin, TotalSize) ->
Checksum = Metadata#?METADATA.checksum,
Metadata_1 = leo_object_storage_transformer:cmeta_bin_into_metadata(
CMetaBin, Metadata),
Checksum_1 = leo_hex:raw_binary_to_integer(crypto:hash(md5, BodyBin)),

case (Checksum == Checksum_1
orelse Checksum_1 == ?MD5_EMPTY_BIN) of
true ->
{ok, Metadata_1#?METADATA{key = KeyBin},
[HeaderBin, KeyBin, BodyBin, TotalSize]};
false ->
Cause = ?ERROR_INVALID_DATA,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get_2/4"},
{line, ?LINE}, {body, Cause}]),
{error, Cause}
case leo_object_storage_transformer:cmeta_bin_into_metadata(
CMetaBin, Metadata) of
{error, Cause} ->
{error, Cause};
Metadata_1 ->
Checksum_1 = leo_hex:raw_binary_to_integer(crypto:hash(md5, BodyBin)),
case (Checksum == Checksum_1
orelse Checksum_1 == ?MD5_EMPTY_BIN) of
true ->
{ok, Metadata_1#?METADATA{key = KeyBin},
[HeaderBin, KeyBin, BodyBin, TotalSize]};
false ->
Reason = ?ERROR_INVALID_DATA,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get_2/4"},
{line, ?LINE}, {body, Reason}]),
{error, Reason}
end
end.
48 changes: 33 additions & 15 deletions src/leo_object_storage_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
object_storage :: #backend_info{},
storage_stats :: #storage_stats{},
state_filepath :: string(),
is_strict_check :: boolean()
is_strict_check :: boolean(),
set_errors :: set()
}).

-record(compact_params, {
Expand Down Expand Up @@ -733,8 +734,7 @@ compact_fun_2({ok, #state{meta_db_id = MetaDBId,
read_handler = undefined,
write_handler = undefined,
tmp_read_handler = undefined,
tmp_write_handler = undefined
}},
tmp_write_handler = undefined}},
compact_fun_3({Res, NewState});

compact_fun_2({Error,_State}, _) ->
Expand Down Expand Up @@ -895,12 +895,13 @@ do_compact(Metadata, CompactParams, #state{meta_db_id = MetaDBId,
%% set a flag of object of compaction
NumOfReplicas = Metadata#?METADATA.num_of_replicas,
HasChargeOfNode = FunHasChargeOfNode(Key, NumOfReplicas),
State_1 = State#state{set_errors = sets:new()},

%% execute compaction
case (is_deleted_rec(MetaDBId, StorageInfo, Metadata)
orelse HasChargeOfNode == false) of
true ->
do_comapct_1(ok, Metadata, CompactParams, State);
do_compact_1(ok, Metadata, CompactParams, State_1);
false ->
%% Insert into the temporary object-container.
%%
Expand All @@ -922,46 +923,63 @@ do_compact(Metadata, CompactParams, #state{meta_db_id = MetaDBId,
num_of_active_objects = NumOfActiveObjs + 1,
size_of_active_object = SizeActive + ObjectSize},

do_comapct_1(Ret, NewMeta, NewCompactParams, State);
do_compact_1(Ret, NewMeta, NewCompactParams, State_1);
Error ->
do_comapct_1(Error, Metadata, CompactParams, State)
do_compact_1(Error, Metadata, CompactParams, State_1)
end
end.


%% @doc Reduce unnecessary objects from object-container.
%% @private
do_comapct_1(ok, Metadata, CompactParams, #state{object_storage = StorageInfo} = State) ->
do_compact_1(ok, Metadata, CompactParams, #state{object_storage = StorageInfo} = State) ->
ReadHandler = StorageInfo#backend_info.read_handler,

case leo_object_storage_haystack:compact_get(
ReadHandler, CompactParams#compact_params.next_offset) of
{ok, NewMetadata, [_HeaderValue, NewKeyValue,
NewBodyValue, NewNextOffset]} ->
ok = output_accumulated_errors(State),
do_compact(NewMetadata,
CompactParams#compact_params{
key_bin = NewKeyValue,
body_bin = NewBodyValue,
next_offset = NewNextOffset},
State);
{error, eof} ->
ok = output_accumulated_errors(State),
NumOfAcriveObjs = CompactParams#compact_params.num_of_active_objects,
SizeOfActiveObjs = CompactParams#compact_params.size_of_active_object,
{ok, NumOfAcriveObjs, SizeOfActiveObjs};
{error, Cause} when Cause =:= ?ERROR_INVALID_DATA orelse
Cause =:= ?ERROR_DATA_SIZE_DID_NOT_MATCH ->
%% retry until eof
{_, Cause} ->
erlang:garbage_collect(self()),
OldOffset = CompactParams#compact_params.next_offset,
do_comapct_1(ok, Metadata,
SetErrors = sets:add_element(Cause, State#state.set_errors),
do_compact_1(ok, Metadata,
CompactParams#compact_params{next_offset = OldOffset + 1},
State);
Error ->
Error
State#state{set_errors = SetErrors})
end;
do_comapct_1(Error,_,_,_) ->
do_compact_1(Error,_,_,_) ->
Error.


%% @doc Output accumulated errors to logger
%% @private
-spec(output_accumulated_errors(#state{}) ->
ok).
output_accumulated_errors(#state{set_errors = SetErrors}) ->
case sets:size(SetErrors) of
0 ->
void;
_ ->
error_logger:warning_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "do_compact_1/4"},
{line, ?LINE}, {body, sets:to_list(SetErrors)}])
end,
ok.


%% @doc Generate a raw file path.
%% @private
-spec(gen_raw_file_path(string()) ->
Expand Down
20 changes: 12 additions & 8 deletions src/leo_object_storage_transformer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ header_bin_to_metadata(Bin) ->
catch
_:_ ->
{error, invalid_format}

end.


Expand All @@ -267,13 +266,18 @@ header_bin_to_metadata(Bin) ->
cmeta_bin_into_metadata(<<>>, Metadata) ->
Metadata;
cmeta_bin_into_metadata(CustomMetaBin, Metadata) ->
CustomeMeta = binary_to_term(CustomMetaBin),
ClusterId = leo_misc:get_value(?PROP_CMETA_CLUSTER_ID, CustomeMeta, []),
NumOfReplicas = leo_misc:get_value(?PROP_CMETA_NUM_OF_REPLICAS, CustomeMeta, 0),
Version = leo_misc:get_value(?PROP_CMETA_VER, CustomeMeta, 0),
Metadata#?METADATA{cluster_id = ClusterId,
num_of_replicas = NumOfReplicas,
ver = Version}.
try
CustomeMeta = binary_to_term(CustomMetaBin),
ClusterId = leo_misc:get_value(?PROP_CMETA_CLUSTER_ID, CustomeMeta, []),
NumOfReplicas = leo_misc:get_value(?PROP_CMETA_NUM_OF_REPLICAS, CustomeMeta, 0),
Version = leo_misc:get_value(?PROP_CMETA_VER, CustomeMeta, 0),
Metadata#?METADATA{cluster_id = ClusterId,
num_of_replicas = NumOfReplicas,
ver = Version}
catch
_:_ ->
{error, invalid_format}
end.

%% @doc List to a custome-metadata(binary)
-spec(list_to_cmeta_bin(list(tuple())) ->
Expand Down
48 changes: 31 additions & 17 deletions test/leo_object_storage_api_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ compaction_test_() ->
{setup,
fun ( ) ->
?debugVal("***** COMPACTION.START *****"),
os:cmd("rm -rf " ++ ?AVS_DIR_FOR_COMPACTION),
application:start(sasl),
application:start(os_mon),
application:start(crypto),
Expand All @@ -51,28 +52,23 @@ compaction_test_() ->
application:stop(crypto),
application:stop(os_mon),
application:stop(sasl),
os:cmd("rm -rf " ++ ?AVS_DIR_FOR_COMPACTION),
timer:sleep(5000),
?debugVal("***** COMPACTION.END *****"),
ok
end,
[{"test all functions",
{timeout, 300, fun compact/0}}
[
{"test compaction - irregular case",
{timeout, 600, fun compact/0}}
]}.

compact() ->
%% Launch object-storage
leo_object_storage_api:start([{1, ?AVS_DIR_FOR_COMPACTION}]),
%% Put 128 objects - exists duplicate objects
ok = put_regular_bin(1, 100),
ok = put_irregular_bin(),
ok = put_regular_bin(25, 5),
ok = put_regular_bin(1, 50),
ok = put_irregular_bin(),
ok = put_regular_bin(50, 5),
ok = put_regular_bin(36, 25),
ok = put_irregular_bin(),
ok = put_regular_bin(75, 5),
ok = put_irregular_bin(),
ok = put_regular_bin(101, 28),
ok = put_regular_bin(51, 50),

%% Execute compaction
timer:sleep(3000),
Expand All @@ -82,18 +78,29 @@ compact() ->
TargetPids = leo_object_storage_api:get_object_storage_pid(all),
ok = leo_compaction_manager_fsm:start(TargetPids, 1, FunHasChargeOfNode),

timer:sleep(10000),
{ok, #compaction_stats{status = CurStatus}} = leo_compaction_manager_fsm:status(),
?assertEqual(idle, CurStatus),
%% Check comaction status
ok = check_status(),

%% Check # of active objects and total of objects
timer:sleep(1000),
{ok, [{_,#storage_stats{total_num = TotalNum,
active_num = ActiveNum
}}|_]} = leo_object_storage_api:stats(),
?debugVal({TotalNum, ActiveNum}),
?assertEqual(100, TotalNum),
?assertEqual(TotalNum, ActiveNum),
ok.

check_status() ->
timer:sleep(100),
case leo_compaction_manager_fsm:status() of
{ok, #compaction_stats{status = 'idle'}} ->
ok;
{ok, _} ->
check_status();
Error ->
Error
end.

%% @doc Put data
%% @private
Expand All @@ -117,9 +124,16 @@ put_regular_bin(Index, Counter) ->
put_regular_bin(Index + 1, Counter -1).

put_irregular_bin() ->
%% @PENDING
%% Bin = crypto:rand_bytes(erlang:phash2(leo_date:clock(), (1024 * 1024))),
%% _ = leo_object_storage_api:add_incorrect_data(Bin),
Min = 1024 * 16,
Len = case erlang:phash2(leo_date:clock(), (1024 * 512)) of
Val when Val < Min ->
Min;
Val ->
Val
end,
?debugVal(Len),
Bin = crypto:rand_bytes(Len),
_ = leo_object_storage_api:add_incorrect_data(Bin),
ok.


Expand Down

0 comments on commit 2e79e71

Please sign in to comment.