Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Jul 29, 2014
1 parent 2a19801 commit 20cc1ac
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 41 deletions.
93 changes: 53 additions & 40 deletions src/leo_object_storage_haystack.erl
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ add_incorrect_data(StorageInfo, Data) ->
-spec(add_incorrect_data(file:io_device(), integer(), binary()) ->
ok | {error, any()}).
add_incorrect_data(WriteHandler, Offset, Data) ->
?debugVal(Offset),
case file:pwrite(WriteHandler, Offset, Data) of
ok ->
ok;
Expand Down Expand Up @@ -596,7 +597,7 @@ compact_get(ReadHandler, Offset) ->
HeaderSize ->
compact_get(ReadHandler, Offset, HeaderSize, HeaderBin);
_ ->
{error, ?ERROR_DATA_SIZE_DID_NOT_MATCH}
{error, {?LINE,?ERROR_DATA_SIZE_DID_NOT_MATCH}}
end;
eof = Cause ->
{error, Cause};
Expand All @@ -623,30 +624,29 @@ compact_get(ReadHandler, Offset, HeaderSize, HeaderBin) ->
%% @private
compact_get(#?METADATA{ksize = KSize,
dsize = DSize,
msize = MSize,
cnumber = CNum} = Metadata, ReadHandler,
Offset, HeaderSize, HeaderBin) ->
DSize4Read = case (CNum > 0) of
true -> 0;
false -> DSize
end,
RemainSize = (KSize + DSize4Read
+ MSize + ?LEN_PADDING),
RemainSize = (KSize + DSize4Read + ?LEN_PADDING),

case (RemainSize > ?MAX_DATABLOCK_SIZE) of
true ->
{error, ?ERROR_INVALID_DATA};
{error, {?LINE,?ERROR_INVALID_DATA}};
false ->
try
case file:pread(ReadHandler, Offset + HeaderSize, RemainSize) of
{ok, RemainBin} ->
case byte_size(RemainBin) of
RemainSize ->
compact_get_1(HeaderBin, Metadata,
DSize4Read, RemainBin,
Offset + HeaderSize + RemainSize);
TotalSize = Offset + HeaderSize + RemainSize,
compact_get_1(ReadHandler,
HeaderBin, Metadata#?METADATA{offset = Offset},
DSize4Read, RemainBin, TotalSize);
_ ->
{error, ?ERROR_DATA_SIZE_DID_NOT_MATCH}
{error, {?LINE,?ERROR_DATA_SIZE_DID_NOT_MATCH}}
end;
eof = Cause ->
{error, Cause};
Expand All @@ -660,44 +660,57 @@ compact_get(#?METADATA{ksize = KSize,
end.

%% @private
compact_get_1(_HeaderBin, #?METADATA{ksize = 0},_DSize,_Bin,_TotalSize) ->
{error, ?ERROR_DATA_SIZE_DID_NOT_MATCH};
compact_get_1(HeaderBin, #?METADATA{ksize = KSize,
msize = 0} = Metadata,
compact_get_1(_ReadHandler,_HeaderBin, #?METADATA{ksize = 0},_DSize,_Bin,_TotalSize) ->
{error, {?LINE, ?ERROR_DATA_SIZE_DID_NOT_MATCH}};
compact_get_1(ReadHandler, HeaderBin, #?METADATA{ksize = KSize,
msize = 0} = Metadata,
DSize, Bin, TotalSize) ->
<< KeyBin:KSize/binary,
BodyBin:DSize/binary,
_Footer/binary>> = Bin,
compact_get_2(HeaderBin, Metadata, KeyBin, BodyBin, <<>>, TotalSize);
compact_get_1(HeaderBin, #?METADATA{ksize = KSize,
msize = MSize} = Metadata,
DSize, Bin, TotalSize) ->
<< KeyBin:KSize/binary,
BodyBin:DSize/binary,
CMetaBin:MSize/binary,
_Footer/binary>> = Bin,
compact_get_2(HeaderBin, Metadata, KeyBin, BodyBin, CMetaBin, TotalSize).
compact_get_2(ReadHandler, HeaderBin, Metadata, KeyBin, BodyBin, TotalSize).

%% @private
compact_get_2(HeaderBin, Metadata, KeyBin, BodyBin, CMetaBin, TotalSize) ->
compact_get_2(ReadHandler, HeaderBin, Metadata, KeyBin, BodyBin, TotalSize) ->
Checksum = Metadata#?METADATA.checksum,
case leo_object_storage_transformer:cmeta_bin_into_metadata(
CMetaBin, Metadata) of
{error, Cause} ->
{error, Cause};
Metadata_1 ->
Checksum_1 = leo_hex:raw_binary_to_integer(crypto:hash(md5, BodyBin)),
case (Checksum == Checksum_1
orelse Checksum_1 == ?MD5_EMPTY_BIN) of
Checksum_1 = leo_hex:raw_binary_to_integer(crypto:hash(md5, BodyBin)),
Metadata_1 = Metadata#?METADATA{key = KeyBin},

case (Checksum == Checksum_1 orelse
Checksum_1 == ?MD5_EMPTY_BIN) of
true ->
HeaderSize = erlang:round(?BLEN_HEADER/8),
Offset = Metadata#?METADATA.offset,
KSize = Metadata#?METADATA.ksize,
DSize = Metadata#?METADATA.dsize,
MSize = Metadata#?METADATA.msize,

case (?MAX_DATABLOCK_SIZE < MSize andalso MSize > 0) of
true ->
{ok, Metadata_1#?METADATA{key = KeyBin},
[HeaderBin, KeyBin, BodyBin, TotalSize]};
MPos = Offset + HeaderSize + KSize + DSize,
case file:pread(ReadHandler, MPos, MSize) of
{ok, CMetaBin} ->
case leo_object_storage_transformer:cmeta_bin_into_metadata(
CMetaBin, Metadata) of
{error,_Cause} ->
{ok, Metadata_1#?METADATA{msize = 0},
[HeaderBin, KeyBin, BodyBin, TotalSize]};
Metadata_2 ->
{ok, Metadata_2#?METADATA{key = KeyBin},
[HeaderBin, KeyBin, BodyBin, TotalSize + MSize]}
end;
{error,_Cause} ->
{error, invalid_data}
end;
false ->
Reason = ?ERROR_INVALID_DATA,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get_2/4"},
{line, ?LINE}, {body, Reason}]),
{error, Reason}
end
{ok, Metadata_1#?METADATA{msize = 0},
[HeaderBin, KeyBin, BodyBin, TotalSize]}
end;
false ->
Reason = ?ERROR_INVALID_DATA,
error_logger:error_msg("~p,~p,~p,~p~n",
[{module, ?MODULE_STRING},
{function, "compact_get_2/4"},
{line, ?LINE}, {body, Reason}]),
{error, Reason}
end.
2 changes: 1 addition & 1 deletion src/leo_object_storage_transformer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ cmeta_bin_into_metadata(CustomMetaBin, Metadata) ->
num_of_replicas = NumOfReplicas,
ver = Version}
catch
_:_ ->
_:_Cause ->
{error, invalid_format}
end.

Expand Down

0 comments on commit 20cc1ac

Please sign in to comment.